refactor: simplify loader and execution boundaries#2034
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR refactors internal control flows and helper methods across multiple subsystems: socket communication (client error handling, server buffering), mail sending (unified SMTP sequence), DAG loading (pipeline extraction), persistence (state machine simplification), and scheduler/agent execution (goroutine and timing logic). Many changes introduce intermediate helpers and adjust error/state handling paths while preserving external APIs. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/runtime/node.go (1)
626-655:⚠️ Potential issue | 🟠 MajorKeep the abort decision aligned with the signal actually sent.
allowOverridecan change the signal delivered to the child process, but the aborted-state check still uses the originalsig. That can leave the node status out of sync with what was actually sent.Suggested fix
func (n *Node) Signal(ctx context.Context, sig os.Signal, allowOverride bool) { n.mu.Lock() defer n.mu.Unlock() status := n.Status() + killSignal := n.signalToSend(sig, allowOverride) if status == core.NodeRunning && n.cmd != nil { - killSignal := n.signalToSend(sig, allowOverride) logger.Info(ctx, "Sending signal", tag.Signal(killSignal.String()), tag.Step(n.Name()), ) @@ - if status == core.NodeRunning && signal.IsTerminationSignalOS(sig) { + if status == core.NodeRunning && signal.IsTerminationSignalOS(killSignal) { n.SetStatus(core.NodeAborted) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/node.go` around lines 626 - 655, The abort decision uses the original sig instead of the actual signal sent when allowOverride is true; update Node.Signal to compute killSignal via signalToSend (as already done) and use that killSignal when checking termination (i.e., call signal.IsTerminationSignalOS(killSignal)) so the SetStatus(core.NodeAborted) decision reflects the signal actually delivered to the child process (refer to Node.Signal and signalToSend).internal/service/frontend/templates/base.gohtml (1)
11-61:⚠️ Potential issue | 🔴 CriticalChange breaks global scope access for
getConfig.Changing from a function declaration to a
constarrow function removes thewindow.getConfigproperty. The deferredbundle.jsscript loads after this inline script and won't have access to a const-scoped variable. The TypeScript declarations (declare function getConfig()) indicate the code expects global scope availability. Either restore the function declaration or ensure all callers import getConfig explicitly from a module.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/templates/base.gohtml` around lines 11 - 61, The change replaced the global function getConfig() with a block-scoped const arrow getConfig, which prevents window.getConfig from being available to the deferred bundle.js and violates the existing TypeScript declaration (declare function getConfig()). Restore global availability by reverting to a function declaration (e.g., function getConfig() { ... }) or explicitly attach the function to the global object (window.getConfig = getConfig) so callers (and bundle.js) can access getConfig globally; ensure the identifier getConfig remains unchanged so the TS declaration still matches.
🧹 Nitpick comments (2)
internal/service/scheduler/scheduler.go (1)
705-705: Please add a minute-boundary characterization test forNextTick.Current coverage (in
internal/service/scheduler/scheduler_test.goLine 124-133) checks01:00:50 -> 01:01:00. Add01:00:00 -> 01:01:00(and ideally a DST transition case) to lock in intended semantics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/scheduler/scheduler.go` at line 705, Add a unit test for NextTick that asserts a time already on a minute boundary advances to the next minute (e.g., input 01:00:00 -> expected 01:01:00) and include an additional test that covers a DST boundary if possible; locate the scheduler.NextTick call (function that returns now.Truncate(time.Minute).Add(time.Minute)) and in internal/service/scheduler/scheduler_test.go add a test case constructing the source time (use time.Date with the appropriate location/zone for DST) and assert equality with the expected next-minute value.internal/core/step.go (1)
191-196: Copy legacy args slice when constructingCommandEntry.At Line 194,
Args: s.Argsreuses the same backing array. A later mutation of deprecatedStep.Argscan unintentionally affectCommands[0].Args.Suggested patch
func (s *Step) legacyCommandEntry() CommandEntry { return CommandEntry{ Command: s.Command, - Args: s.Args, + Args: append([]string(nil), s.Args...), CmdWithArgs: s.CmdWithArgs, } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/step.go` around lines 191 - 196, The legacyCommandEntry method is returning CommandEntry with Args pointing to the same backing slice as Step.Args which allows later mutations to Step.Args to affect the stored Commands; update legacyCommandEntry (the function named legacyCommandEntry that returns CommandEntry) to copy the slice contents when setting Args (e.g., allocate a new []string and copy or use append to copy) so CommandEntry.Args is independent of Step.Args.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmn/mailer/mailer.go`:
- Around line 141-143: Sanitize MIME header fields to prevent CR/LF header
injection by stripping CR/LF and other illegal characters before composing
headers; specifically, when building the MIME headers that use the raw from and
subject values (the places around the header composition that reference the
variables created from replacer.Replace(from) and subject), apply the same
sanitization (or a dedicated sanitizeHeaderField function) to from, subject and
any other header values instead of only sanitizing SMTP envelope recipients via
sanitizeAddresses(to), and use the sanitized values when calling c.Mail and when
writing the MIME header lines so header injection cannot occur.
- Around line 164-166: The deferred call to wc.Close() should be removed to
avoid double-closing the DATA writer; instead keep the explicit close already
present (the wc.Close() invoked in the success/error path) and ensure its
returned error is handled and propagated (or logged) appropriately. Locate the
DATA writer variable wc in the mail sending function (the block that writes to
the SMTP DATA writer) and delete the defer func() { _ = wc.Close() }(), keeping
the existing explicit wc.Close() call and replacing any ignored error handling
there with proper error capture and return (or processLogger/error return) so
protocol-level close errors are not swallowed.
- Around line 135-139: The STARTTLS negotiation is currently inside the useAuth
branch; move the STARTTLS/TLS upgrade logic so it runs unconditionally before
any authentication attempts, and keep only the authentication call
(enableAuth(ctx, c)) guarded by if useAuth. Concretely, extract or relocate the
STARTTLS negotiation block (the lines performing STARTTLS and TLS config) to
execute prior to the conditional that calls m.enableAuth(ctx, c), preserve
existing error handling and context/connection variables (ctx, c), and ensure
only the authentication step remains inside if useAuth.
In `@internal/cmn/sock/server.go`:
- Around line 173-186: The flush() function in type httpResponseWriter currently
skips sending a response when wroteBody is false, dropping WriteHeader-only
responses; change flush so it always constructs and writes an http.Response
(using w.statusCode, a cloned w.header, and an io.NopCloser over the current
body bytes) even when w.body is empty — set ContentLength to int64(w.body.Len())
(which will be 0 for header-only responses) and call response.Write(w.conn) in
all cases; keep existing behavior for when a body was written but remove the
early return that prevents emitting headers/status.
In `@internal/core/spec/loader.go`:
- Around line 626-641: validateUniqueNames currently only checks names among
dags[1:], allowing a child DAG to collide with dags[0].Name; fix by validating
and registering the main DAG's name before the loop: ensure dags[0].Name is
non-empty when len(dags) >= 2, add it to the names map, then iterate over
dags[1:] as now and detect duplicates against that map (and still reject empty
child names). Reference validateUniqueNames, dags[0].Name, and the loop over
dags[1:].
In `@internal/runtime/agent/reporter.go`:
- Around line 145-160: The statusBadgeClass function is missing a branch for
"partially_succeeded", causing DAG badges to be unstyled; add a case for
"partially_succeeded" in statusBadgeClass and return the same badge class string
that statusToClass maps that status to so the DAG badge styling matches the rest
of the UI (modify the statusBadgeClass switch to include case
"partially_succeeded": return "<same-class-as-statusToClass>").
In `@internal/runtime/node.go`:
- Around line 657-663: The Cancel method only transitions nodes to NodeAborted
when Status() == core.NodeRunning, leaving queued/waiting nodes able to start
later; update Node.Cancel to also handle queued/waiting states (e.g., check for
core.NodeQueued and core.NodeWaiting in addition to core.NodeRunning) and call
SetStatus(core.NodeAborted) for those states under the existing mu lock, or
alternatively add a clear comment on Node.Cancel documenting that it only aborts
running nodes; reference the Node.Cancel, Node.Status, and SetStatus methods and
the core.NodeRunning/core.NodeAborted (and core.NodeQueued/core.NodeWaiting)
status constants when making the change and update relevant tests.
In `@internal/service/scheduler/scheduler.go`:
- Around line 682-689: waitForTick currently returns false on OS signal without
calling s.Stop(ctx), which leaves startHeartbeat running (it only exits on
<-s.quit) and blocks Start() on wg.Wait(); modify waitForTick so that when a
termination signal is received it invokes s.Stop(ctx) before returning false,
ensuring s.quit is closed and all goroutines (including startHeartbeat) can
exit; locate the signal case in waitForTick and call the Scheduler.Stop method
with the provided ctx, preserving existing return behavior after the call.
---
Outside diff comments:
In `@internal/runtime/node.go`:
- Around line 626-655: The abort decision uses the original sig instead of the
actual signal sent when allowOverride is true; update Node.Signal to compute
killSignal via signalToSend (as already done) and use that killSignal when
checking termination (i.e., call signal.IsTerminationSignalOS(killSignal)) so
the SetStatus(core.NodeAborted) decision reflects the signal actually delivered
to the child process (refer to Node.Signal and signalToSend).
In `@internal/service/frontend/templates/base.gohtml`:
- Around line 11-61: The change replaced the global function getConfig() with a
block-scoped const arrow getConfig, which prevents window.getConfig from being
available to the deferred bundle.js and violates the existing TypeScript
declaration (declare function getConfig()). Restore global availability by
reverting to a function declaration (e.g., function getConfig() { ... }) or
explicitly attach the function to the global object (window.getConfig =
getConfig) so callers (and bundle.js) can access getConfig globally; ensure the
identifier getConfig remains unchanged so the TS declaration still matches.
---
Nitpick comments:
In `@internal/core/step.go`:
- Around line 191-196: The legacyCommandEntry method is returning CommandEntry
with Args pointing to the same backing slice as Step.Args which allows later
mutations to Step.Args to affect the stored Commands; update legacyCommandEntry
(the function named legacyCommandEntry that returns CommandEntry) to copy the
slice contents when setting Args (e.g., allocate a new []string and copy or use
append to copy) so CommandEntry.Args is independent of Step.Args.
In `@internal/service/scheduler/scheduler.go`:
- Line 705: Add a unit test for NextTick that asserts a time already on a minute
boundary advances to the next minute (e.g., input 01:00:00 -> expected 01:01:00)
and include an additional test that covers a DST boundary if possible; locate
the scheduler.NextTick call (function that returns
now.Truncate(time.Minute).Add(time.Minute)) and in
internal/service/scheduler/scheduler_test.go add a test case constructing the
source time (use time.Date with the appropriate location/zone for DST) and
assert equality with the expected next-minute value.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b26f2ce5-1d71-435d-ade3-233b92aeb911
📒 Files selected for processing (19)
internal/cmn/mailer/mailer.gointernal/cmn/sock/client.gointernal/cmn/sock/server.gointernal/cmn/sock/server_test.gointernal/core/condition.gointernal/core/dag.gointernal/core/spec/loader.gointernal/core/spec/loader_test.gointernal/core/step.gointernal/persis/filedagrun/writer.gointernal/persis/filedagrun/writer_test.gointernal/runtime/agent/agent.gointernal/runtime/agent/reporter.gointernal/runtime/node.gointernal/service/frontend/templates.gointernal/service/frontend/templates/base.gohtmlinternal/service/frontend/templates/index.gohtmlinternal/service/scheduler/scheduler.gointernal/testdata/runtime/runner/testfile.sh
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/cmn/sock/server_test.go (1)
92-95:⚠️ Potential issue | 🟡 MinorGoroutine leak:
Shutdownis never called externally, soServeblocks forever.The
Shutdowncall inside the goroutine is unreachable — it only runs afterServereturns, andServeonly returns whenShutdownis called. Since no outside caller invokesShutdown, the server goroutine stays parked onAccept()for the rest of the test binary's lifetime, andServe's return value is never asserted.TestEmptyResponse(lines 140–143) has the same pattern. Please mirror the clean pattern used byTestMultipleWritesProduceSingleResponseBody/TestStartAndShutdownServer.🧪 Suggested fix for `TestHeaderOnlyResponse`
listen := make(chan error, 1) + done := make(chan error, 1) go func() { - err = unixServer.Serve(context.Background(), listen) - _ = unixServer.Shutdown(context.Background()) + done <- unixServer.Serve(context.Background(), listen) }() // Wait for the server to signal it is ready. require.NoError(t, <-listen) @@ require.Equal(t, http.StatusForbidden, response.StatusCode) require.Empty(t, body) + + require.NoError(t, unixServer.Shutdown(context.Background())) + require.True(t, errors.Is(<-done, sock.ErrServerRequestedShutdown)) }Apply the equivalent change to
TestEmptyResponse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server_test.go` around lines 92 - 95, The goroutine in TestHeaderOnlyResponse/TestEmptyResponse starts unixServer.Serve(ctx, listen) and then calls unixServer.Shutdown after Serve returns, which never happens because Shutdown is never invoked externally; this leaks a goroutine. Fix by mirroring the pattern from TestMultipleWritesProduceSingleResponseBody/TestStartAndShutdownServer: start Serve in a goroutine but control shutdown from the test by canceling or calling unixServer.Shutdown(ctx) from the test (or using a cancellable context passed into Serve) and wait for Serve to return (e.g., via a sync.WaitGroup or channel) so you can assert its error; update the goroutine that calls unixServer.Serve to expose a way for the test to trigger unixServer.Shutdown and to wait for Serve's completion.
🧹 Nitpick comments (12)
internal/runtime/agent/reporter_test.go (1)
456-460: Consider expanding to table-driven coverage forstatusBadgeClass.The new test only exercises the
"partially_succeeded"branch. SincestatusBadgeClasshas 9 distinct branches (including thedefaultempty-string case), a table-driven test would provide stronger regression protection at minimal cost.♻️ Proposed table-driven version
-func TestStatusBadgeClassPartiallySucceeded(t *testing.T) { - t.Parallel() - - require.Equal(t, "success", statusBadgeClass("partially_succeeded")) -} +func TestStatusBadgeClass(t *testing.T) { + t.Parallel() + + cases := map[string]string{ + "finished": "success", + "succeeded": "success", + "partially_succeeded": "success", + "failed": "failed", + "running": "running", + "skipped": "skipped", + "aborted": "aborted", + "waiting": "wait", + "unknown": "", + } + for status, want := range cases { + status, want := status, want + t.Run(status, func(t *testing.T) { + t.Parallel() + require.Equal(t, want, statusBadgeClass(status)) + }) + } +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/reporter_test.go` around lines 456 - 460, The test TestStatusBadgeClassPartiallySucceeded only checks one branch; change it to a table-driven test that enumerates all known status inputs and expected outputs for statusBadgeClass (including all 9 distinct branches and the default/empty-string case), iterate the table with t.Run subtests (calling t.Parallel() per subtest if desired) and assert each mapping with require.Equal, so the behavior of statusBadgeClass is fully covered; keep the test named to reflect table-driven style (e.g., TestStatusBadgeClass) and reference statusBadgeClass in each subtest.internal/core/step.go (1)
191-197: Optional: useslices.Clonefor the defensiveArgscopy.
append([]string(nil), s.Args...)is correct (and yieldsnilwhens.Argsisnil, which is fine here), butslices.Clone(s.Args)from the standard library is more idiomatic and reads more directly as "defensive copy". Purely a readability preference — feel free to skip.♻️ Proposed refactor
func (s *Step) legacyCommandEntry() CommandEntry { return CommandEntry{ Command: s.Command, - Args: append([]string(nil), s.Args...), + Args: slices.Clone(s.Args), CmdWithArgs: s.CmdWithArgs, } }Requires adding
"slices"to the import block.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/step.go` around lines 191 - 197, Replace the manual defensive copy of s.Args in Step.legacyCommandEntry with the idiomatic slices.Clone call: use slices.Clone(s.Args) for the Args field, and add "slices" to the imports; keep the rest of the returned CommandEntry (Command and CmdWithArgs) unchanged to preserve behavior when s.Args is nil.internal/cmn/mailer/mailer.go (6)
347-364: Pre-existing duplicateContent-Transfer-Encodingheader in attachments (out of scope, flagging for awareness).Unrelated to your refactor, but each attachment block writes
Content-Transfer-Encoding: base64twice — once at Line 354 and again at Line 359. Some strict MIME parsers reject duplicateContent-Transfer-Encodingheaders per RFC 2045 §6.4. The base64 switch on Line 360 itself is a clean simplification; consider removing one of the duplicate header lines in a follow-up.♻️ Follow-up cleanup
_, _ = fmt.Fprintf(&buf, "\r\n\n--%s\r\n", boundary) _, _ = buf.WriteString("Content-Type: text/plain;" + "\r\n") - _, _ = buf.WriteString("Content-Transfer-Encoding: base64" + "\r\n") _, _ = buf.WriteString( "Content-Disposition: attachment; filename=" + filepath.Base(fileName) + "\r\n", ) _, _ = buf.WriteString("Content-Transfer-Encoding: base64\r\n\n")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer.go` around lines 347 - 364, The attachment builder in addAttachments writes the "Content-Transfer-Encoding: base64" header twice for each attachment which can break strict MIME parsers; update addAttachments to emit that header only once per attachment (remove the duplicate write call — the second occurrence just before writing the base64 payload) while keeping the existing Content-Type, Content-Disposition, boundary usage, and base64 encoding of data intact so each attachment block contains a single Content-Transfer-Encoding header.
183-196: Helpers are fine; consider makingsanitizeHeaderFielda touch stricter.
replacercurrently strips\r,\n, and their percent-encoded forms, which is sufficient for the CRLF-injection scenario the new test covers. If you want defense-in-depth against more exotic injection vectors, you could also strip other C0 controls (\x00–\x1fexcept tab, plus\x7f) and truncate absurdly long values. Not required — the current behavior matches the test contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer.go` around lines 183 - 196, Update sanitizeHeaderField to be stricter: after applying replacer.Replace(value) also remove remaining C0 control characters (bytes 0x00–0x1F and 0x7F) except horizontal tab, and truncate the result to a reasonable max length (e.g., 256 chars) to avoid absurdly long header values; reference the sanitizeHeaderField function and the replacer variable so you locate the code, and keep sanitizeAddresses behavior unchanged.
135-178: Inconsistent error wrapping betweenuseAuthand no-auth paths.Every SMTP step (
MAIL FROM,RCPT TO,DATA, body write,wc.Close) wraps errors with a descriptive message only whenuseAuthis true, and returns the bare error otherwise. That asymmetry was inherited from the previous split functions, but now that both paths share one implementation there's no reason the no-auth path should produce worse diagnostics — operators will seeread tcp ... i/o timeoutinstead ofMAIL FROM failed: ..., making field triage of unauthenticated relay failures noticeably harder.Also note
prepareSessionat Line 135 always wraps errors (HELO failed,STARTTLS failed,authentication failed) regardless ofuseAuth, so the wrapping policy within this single function is already internally inconsistent.Consider wrapping unconditionally and dropping the
useAuthbranches:♻️ Proposed simplification
- if err := c.Mail(safeFrom); err != nil { - if useAuth { - return fmt.Errorf("MAIL FROM failed: %w", err) - } - return err - } + if err := c.Mail(safeFrom); err != nil { + return fmt.Errorf("MAIL FROM failed: %w", err) + } for _, recipient := range recipients { - if err := c.Rcpt(recipient); err != nil { - if useAuth { - return fmt.Errorf("RCPT TO failed: %w", err) - } - return err - } + if err := c.Rcpt(recipient); err != nil { + return fmt.Errorf("RCPT TO failed: %w", err) + } } wc, err := c.Data() if err != nil { - if useAuth { - return fmt.Errorf("DATA command failed: %w", err) - } - return err + return fmt.Errorf("DATA command failed: %w", err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer.go` around lines 135 - 178, The code currently only wraps SMTP errors (MAIL FROM, RCPT TO, DATA, write, Close) when useAuth is true, causing inconsistent/poorer diagnostics for non-auth paths; update the send flow (the block after prepareSession including calls to c.Mail, c.Rcpt, c.Data, wc.Write, wc.Close and the payload construction via composeMail/processEmailBody) to always wrap returned errors with descriptive context (e.g., "MAIL FROM failed", "RCPT TO failed", "DATA command failed", "failed to write email body", "failed to close data writer") and remove the conditional branches that return the bare err based on useAuth so error wrapping is consistent with prepareSession.
309-324:composeMailswitch tobytes.Bufferis cleaner; minor redundant CRLF sequence.Moving to
bytes.Buffer+base64.StdEncoding.EncodeToStringis a nice simplification over the prior byte-slice juggling. One small thing: Lines 319–322 emit"\r\n\r\n--",boundary,"--\r\n\r\n", then another"\r\n\r\n". The trailing extra CRLF pair is probably leftover from the old formatting — not harmful, but redundant for a multipart epilogue. You can drop Line 322 if you want a tighter payload.♻️ Optional tightening
buf.WriteString("\r\n\r\n--") buf.WriteString(boundary) buf.WriteString("--\r\n\r\n") - buf.WriteString("\r\n\r\n") return buf.Bytes()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer.go` around lines 309 - 324, composeMail assembles the multipart payload but writes an extra trailing CRLF pair after closing the final boundary; remove the final buf.WriteString("\r\n\r\n") so the epilogue ends with "\r\n\r\n--" + boundary + "--\r\n\r\n" (i.e., keep composeHeader, base64 encoding of body, addAttachments and the boundary close but drop the redundant trailing CRLF write) to produce a tighter multipart payload.
292-307: Double sanitization is harmless but makes the contract ambiguous.
composeHeadernow sanitizesto,from, andsubjectinternally (Lines 295–297), but callers insend(Line 165) already pass pre-sanitized values (safeFrom,safeSubject, andrecipientsfromsanitizeAddresses).replacer.Replaceis idempotent, so this is correct; however, having sanitization happen in two places makes it unclear which layer owns the invariant.Preference: keep sanitization in
composeHeader(defense-in-depth at the lowest layer closest to the header string) and simplify the call site insendto pass the raw values — or vice versa, remove it fromcomposeHeaderand rely on the caller. Either is fine; picking one avoids the "which layer is authoritative?" question for future readers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer.go` around lines 292 - 307, composeHeader currently re-sanitizes to/from/subject even though send already sanitizes them; keep sanitization in composeHeader (defense-in-depth) and simplify send by passing raw values instead of pre-sanitized ones: in the send function stop calling sanitizeAddresses for the recipients passed into composeHeader (remove creation/use of recipients variable derived from sanitizeAddresses) and pass the original to/from/subject variables (remove safeFrom/safeSubject usage), leaving composeHeader to call sanitizeAddresses and sanitizeHeaderField as it currently does; update any related variable names in send to reflect raw inputs and ensure tests/other callers still supply unsanitized strings if they relied on previous behavior.
80-100: These are test-only helpers — consider moving to*_test.gofor clarity.Both
sendWithNoAuthandsendWithAuthare called exclusively from tests (mailer_test.go). Since they're internal test seams with no production callers, moving them to the test file would make their intent clearer and avoid confusion about whether they're part of the public API.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer.go` around lines 80 - 100, sendWithNoAuth and sendWithAuth are test-only helpers and should be moved into the test file to clarify intent: remove these two functions from mailer.go and paste them into mailer_test.go (keeping them unexported and in the same package so they can access Client and mailTimeout), update any test imports or references if necessary, and run tests to ensure visibility of Client, mailTimeout, and the methods remain correct when moved to the _test.go file.internal/cmn/mailer/mailer_test.go (1)
203-221: LGTM — good coverage for header injection prevention.The test correctly validates that CRLF sequences in
to/from/subjectare stripped so they cannot split out new header lines, and it pins the resulting "flattened" form. Parallel execution is appropriate since this path has no shared state.One small observation: this only exercises
composeMaildirectly. Consider also adding a characterization aroundSend'srecipients/safeFrom/safeSubjectwiring (e.g., via the existing mock SMTP server) so future refactors of thesendpipeline can't reintroduce an injection path upstream ofcomposeMail. Non-blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/mailer/mailer_test.go` around lines 203 - 221, The current test only exercises composeMail directly; add an integration-style test that exercises Send to ensure its recipients, safeFrom, and safeSubject plumbing can't reintroduce header injection upstream of composeMail: create a test (e.g., TestSendSanitizesHeaders) that initializes the mailer via New(Config{}), starts the existing mock SMTP server used in other tests, calls client.Send with malicious values for to/from/subject containing CRLF and injected headers, and then inspect the SMTP-received payload to assert it does not contain CRLF-injected header lines and instead contains the flattened forms (reference composeMail, Send, New, Config to locate code paths to validate).internal/cmn/sock/server.go (1)
78-109: Consider tracking in-flightserveConngoroutines for graceful shutdown.
Shutdowncloses the listener and returns immediately; any goroutines spawned at line 86 keep running with no way for callers to wait for them to drain. A panic in the handler also tears down the whole process sinceserveConnhas no recover. Both are pre-existing traits carried over by the refactor, but this PR is a natural point to tighten them up if desired.♻️ Sketch: WaitGroup + panic guard
type Server struct { addr string handlerFunc HTTPHandlerFunc listener net.Listener quit atomic.Bool mu sync.Mutex + wg sync.WaitGroup } @@ - go srv.serveConn(ctx, conn) + srv.wg.Add(1) + go srv.serveConn(ctx, conn) } } @@ func (srv *Server) serveConn(ctx context.Context, conn net.Conn) { + defer srv.wg.Done() defer func() { + if r := recover(); r != nil { + logger.Error(ctx, "panic in serveConn", tag.Any("panic", r)) + } _ = conn.Close() }()And have
Shutdown(orServe's deferred cleanup) callsrv.wg.Wait()before returning.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 78 - 109, The server currently spawns serveConn goroutines that aren't tracked and lack panic recovery, so Shutdown/Serve can't wait for them and a handler panic can crash the process; add a sync.WaitGroup field (e.g., srv.wg) to increment in Serve before launching each goroutine and defer wg.Done() inside serveConn, add a defer recover() wrapper at the top of serveConn to log and swallow panics, and make Shutdown (or Serve's deferred cleanup) call srv.wg.Wait() before returning to ensure graceful drain.internal/runtime/node.go (1)
541-557: Guard against emptysubRunsin the non-parallel branch.
runParams(subRuns[0])at Line 547 will panic ifsubRunsis empty. TodayBuildSubDAGRunsalways returns at least one element for the non-parallel path and validateslen(items) > 0for parallel, so this is safe in practice — but the helper now silently relies on that invariant. Consider an explicit guard so a future change toBuildSubDAGRunscan't turn this into a hard crash insidesetupExecutor.🛡️ Proposed defensive guard
func (n *Node) configureSubDAGExecutor(cmd executor.Executor, subRuns []SubDAGRun) error { + if len(subRuns) == 0 { + return fmt.Errorf("no sub DAG runs to configure for step %q", n.Name()) + } if n.Step().Parallel == nil { dagExecutor, ok := cmd.(executor.DAGExecutor) if !ok { return fmt.Errorf("executor %T does not support sub DAG execution", cmd) } dagExecutor.SetParams(runParams(subRuns[0])) return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/node.go` around lines 541 - 557, In configureSubDAGExecutor, guard the non-parallel branch against an empty subRuns slice before calling runParams(subRuns[0]) to avoid a panic: check len(subRuns) == 0 and return a clear error (e.g., fmt.Errorf("no subRuns provided for non-parallel executor")) if empty; otherwise proceed to cast cmd to executor.DAGExecutor and call SetParams(runParams(subRuns[0])); keep the parallel branch unchanged (still uses executor.ParallelExecutor and runParamsList).internal/runtime/node_test.go (1)
171-177: Optional: assertCancelis a no-op for non-running, non-waiting states.This subtest covers the new
NodeWaitingbranch nicely, but the past review thread on this method also called out thatCancelshould be precise about what it transitions. Consider one or two extra cases (e.g.NodeSucceeded,NodeNotStarted) asserting the status is left untouched, so any future broadening/narrowing of the switch is caught here.♻️ Proposed table-style expansion
- t.Run("CancelWaiting", func(t *testing.T) { - t.Parallel() - - node := runtime.NewNode(core.Step{Name: core.NodeWaiting.String()}, runtime.NodeState{Status: core.NodeWaiting}) - node.Cancel() - require.Equal(t, core.NodeAborted, node.State().Status) - }) + t.Run("Cancel", func(t *testing.T) { + t.Parallel() + cases := []struct { + name string + initial core.NodeStatus + expected core.NodeStatus + }{ + {"Running", core.NodeRunning, core.NodeAborted}, + {"Waiting", core.NodeWaiting, core.NodeAborted}, + {"Succeeded", core.NodeSucceeded, core.NodeSucceeded}, + {"NotStarted", core.NodeNotStarted, core.NodeNotStarted}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + node := runtime.NewNode(core.Step{Name: "cancel"}, runtime.NodeState{Status: tc.initial}) + node.Cancel() + require.Equal(t, tc.expected, node.State().Status) + }) + } + })As per coding guidelines: "Use stretchr/testify assertions for testing in Go".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/node_test.go` around lines 171 - 177, Add assertions to the CancelWaiting subtest ensuring Node.Cancel is a no-op for states other than running/waiting: instantiate nodes via runtime.NewNode with statuses core.NodeSucceeded and core.NodeNotStarted (or another non-running/non-waiting status), call node.Cancel() for each, and assert using require.Equal that node.State().Status remains unchanged; reference the existing runtime.NewNode, node.Cancel, and node.State() helpers to locate where to add these checks so future changes to the switch are caught.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/core/spec/loader.go`:
- Around line 686-696: The expandHomeDir function currently treats any string
starting with "~" as expandable (e.g., "~alice/..."), causing incorrect
expansions; modify it to only expand when file == "~" or when it has the "~/..."
prefix. In expandHomeDir, change the guard from strings.HasPrefix(file, "~") to
something like file == "~" || strings.HasPrefix(file, "~/"), call
os.UserHomeDir() as before, and return homeDir when file == "~" or
path.Join(homeDir, file[2:]) (or homeDir + file[1:]) when it starts with "~/".
Keep all other paths unchanged.
- Around line 522-527: documentYAML currently returns a re-marshaled YAML for
sub-DAGs which loses formatting; audit all consumers of dest.YamlData
(especially documentYAML, rebuildDAGRunSnapshotFromYAML, and the temp-file
concatenation logic used by dag runner) to confirm none require byte-exact
original YAML; if any do, add a new field (e.g., OriginalYamlData) to the
destination struct and set it to fullData alongside dest.YamlData, then update
the temp-file reconstruction and snapshot code to prefer OriginalYamlData when
present, falling back to dest.YamlData otherwise.
- Around line 423-441: decodeDocuments currently uses the loop variable index
for dagDocument.index but increments it for empty docs too, causing the first
non-empty document to get a wrong higher index; update decodeDocuments so it
maintains a separate ordinal (e.g., nonEmptyIndex) that is incremented only when
a non-empty doc is appended and assign that to dagDocument.index, leaving the
loop index for decoding errors as-is; this ensures buildDocumentContext and
documentYAML see the true ordinal of the first non-empty DAG.
In `@internal/runtime/node.go`:
- Around line 656-663: The switch in Node.Cancel() is non-exhaustive and tripps
the exhaustive linter; change it to either an if-check or add a default to
satisfy golangci-lint: replace the switch on n.Status() that only handles
core.NodeRunning and core.NodeWaiting with a simple if (e.g., if s :=
n.Status(); s == core.NodeRunning || s == core.NodeWaiting {
n.SetStatus(core.NodeAborted) }) or keep the switch and add a default: case ...:
(existing cases) and default: break — reference Node.Cancel, Node.Status, and
SetStatus when making the change.
---
Outside diff comments:
In `@internal/cmn/sock/server_test.go`:
- Around line 92-95: The goroutine in TestHeaderOnlyResponse/TestEmptyResponse
starts unixServer.Serve(ctx, listen) and then calls unixServer.Shutdown after
Serve returns, which never happens because Shutdown is never invoked externally;
this leaks a goroutine. Fix by mirroring the pattern from
TestMultipleWritesProduceSingleResponseBody/TestStartAndShutdownServer: start
Serve in a goroutine but control shutdown from the test by canceling or calling
unixServer.Shutdown(ctx) from the test (or using a cancellable context passed
into Serve) and wait for Serve to return (e.g., via a sync.WaitGroup or channel)
so you can assert its error; update the goroutine that calls unixServer.Serve to
expose a way for the test to trigger unixServer.Shutdown and to wait for Serve's
completion.
---
Nitpick comments:
In `@internal/cmn/mailer/mailer_test.go`:
- Around line 203-221: The current test only exercises composeMail directly; add
an integration-style test that exercises Send to ensure its recipients,
safeFrom, and safeSubject plumbing can't reintroduce header injection upstream
of composeMail: create a test (e.g., TestSendSanitizesHeaders) that initializes
the mailer via New(Config{}), starts the existing mock SMTP server used in other
tests, calls client.Send with malicious values for to/from/subject containing
CRLF and injected headers, and then inspect the SMTP-received payload to assert
it does not contain CRLF-injected header lines and instead contains the
flattened forms (reference composeMail, Send, New, Config to locate code paths
to validate).
In `@internal/cmn/mailer/mailer.go`:
- Around line 347-364: The attachment builder in addAttachments writes the
"Content-Transfer-Encoding: base64" header twice for each attachment which can
break strict MIME parsers; update addAttachments to emit that header only once
per attachment (remove the duplicate write call — the second occurrence just
before writing the base64 payload) while keeping the existing Content-Type,
Content-Disposition, boundary usage, and base64 encoding of data intact so each
attachment block contains a single Content-Transfer-Encoding header.
- Around line 183-196: Update sanitizeHeaderField to be stricter: after applying
replacer.Replace(value) also remove remaining C0 control characters (bytes
0x00–0x1F and 0x7F) except horizontal tab, and truncate the result to a
reasonable max length (e.g., 256 chars) to avoid absurdly long header values;
reference the sanitizeHeaderField function and the replacer variable so you
locate the code, and keep sanitizeAddresses behavior unchanged.
- Around line 135-178: The code currently only wraps SMTP errors (MAIL FROM,
RCPT TO, DATA, write, Close) when useAuth is true, causing inconsistent/poorer
diagnostics for non-auth paths; update the send flow (the block after
prepareSession including calls to c.Mail, c.Rcpt, c.Data, wc.Write, wc.Close and
the payload construction via composeMail/processEmailBody) to always wrap
returned errors with descriptive context (e.g., "MAIL FROM failed", "RCPT TO
failed", "DATA command failed", "failed to write email body", "failed to close
data writer") and remove the conditional branches that return the bare err based
on useAuth so error wrapping is consistent with prepareSession.
- Around line 309-324: composeMail assembles the multipart payload but writes an
extra trailing CRLF pair after closing the final boundary; remove the final
buf.WriteString("\r\n\r\n") so the epilogue ends with "\r\n\r\n--" + boundary +
"--\r\n\r\n" (i.e., keep composeHeader, base64 encoding of body, addAttachments
and the boundary close but drop the redundant trailing CRLF write) to produce a
tighter multipart payload.
- Around line 292-307: composeHeader currently re-sanitizes to/from/subject even
though send already sanitizes them; keep sanitization in composeHeader
(defense-in-depth) and simplify send by passing raw values instead of
pre-sanitized ones: in the send function stop calling sanitizeAddresses for the
recipients passed into composeHeader (remove creation/use of recipients variable
derived from sanitizeAddresses) and pass the original to/from/subject variables
(remove safeFrom/safeSubject usage), leaving composeHeader to call
sanitizeAddresses and sanitizeHeaderField as it currently does; update any
related variable names in send to reflect raw inputs and ensure tests/other
callers still supply unsanitized strings if they relied on previous behavior.
- Around line 80-100: sendWithNoAuth and sendWithAuth are test-only helpers and
should be moved into the test file to clarify intent: remove these two functions
from mailer.go and paste them into mailer_test.go (keeping them unexported and
in the same package so they can access Client and mailTimeout), update any test
imports or references if necessary, and run tests to ensure visibility of
Client, mailTimeout, and the methods remain correct when moved to the _test.go
file.
In `@internal/cmn/sock/server.go`:
- Around line 78-109: The server currently spawns serveConn goroutines that
aren't tracked and lack panic recovery, so Shutdown/Serve can't wait for them
and a handler panic can crash the process; add a sync.WaitGroup field (e.g.,
srv.wg) to increment in Serve before launching each goroutine and defer
wg.Done() inside serveConn, add a defer recover() wrapper at the top of
serveConn to log and swallow panics, and make Shutdown (or Serve's deferred
cleanup) call srv.wg.Wait() before returning to ensure graceful drain.
In `@internal/core/step.go`:
- Around line 191-197: Replace the manual defensive copy of s.Args in
Step.legacyCommandEntry with the idiomatic slices.Clone call: use
slices.Clone(s.Args) for the Args field, and add "slices" to the imports; keep
the rest of the returned CommandEntry (Command and CmdWithArgs) unchanged to
preserve behavior when s.Args is nil.
In `@internal/runtime/agent/reporter_test.go`:
- Around line 456-460: The test TestStatusBadgeClassPartiallySucceeded only
checks one branch; change it to a table-driven test that enumerates all known
status inputs and expected outputs for statusBadgeClass (including all 9
distinct branches and the default/empty-string case), iterate the table with
t.Run subtests (calling t.Parallel() per subtest if desired) and assert each
mapping with require.Equal, so the behavior of statusBadgeClass is fully
covered; keep the test named to reflect table-driven style (e.g.,
TestStatusBadgeClass) and reference statusBadgeClass in each subtest.
In `@internal/runtime/node_test.go`:
- Around line 171-177: Add assertions to the CancelWaiting subtest ensuring
Node.Cancel is a no-op for states other than running/waiting: instantiate nodes
via runtime.NewNode with statuses core.NodeSucceeded and core.NodeNotStarted (or
another non-running/non-waiting status), call node.Cancel() for each, and assert
using require.Equal that node.State().Status remains unchanged; reference the
existing runtime.NewNode, node.Cancel, and node.State() helpers to locate where
to add these checks so future changes to the switch are caught.
In `@internal/runtime/node.go`:
- Around line 541-557: In configureSubDAGExecutor, guard the non-parallel branch
against an empty subRuns slice before calling runParams(subRuns[0]) to avoid a
panic: check len(subRuns) == 0 and return a clear error (e.g., fmt.Errorf("no
subRuns provided for non-parallel executor")) if empty; otherwise proceed to
cast cmd to executor.DAGExecutor and call SetParams(runParams(subRuns[0])); keep
the parallel branch unchanged (still uses executor.ParallelExecutor and
runParamsList).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 68c93ad5-4970-4138-b161-1bfa1e31a81d
📒 Files selected for processing (15)
internal/cmn/mailer/mailer.gointernal/cmn/mailer/mailer_test.gointernal/cmn/sock/server.gointernal/cmn/sock/server_test.gointernal/core/spec/loader.gointernal/core/spec/loader_test.gointernal/core/step.gointernal/core/step_test.gointernal/runtime/agent/reporter.gointernal/runtime/agent/reporter_test.gointernal/runtime/node.gointernal/runtime/node_test.gointernal/service/scheduler/scheduler.gointernal/service/scheduler/scheduler_test.gointernal/service/scheduler/wait_for_tick_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/runtime/agent/reporter.go
Summary
Verification
Summary by CodeRabbit
Release Notes
Bug Fixes
New Features
Improvements