Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions bindings/go/dag/sync/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"sync"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -155,7 +156,9 @@ func (d *GraphDiscoverer[K, V]) Discover(ctx context.Context) (retErr error) {
errGroup, errgroupCtx := errgroup.WithContext(ctx)
for _, root := range d.opts.Roots {
errGroup.Go(func() error {
return d.discover(errgroupCtx, root)
// create empty path for each root
path := make(map[K]bool)
return d.discover(errgroupCtx, root, path)
})
}

Expand All @@ -169,6 +172,7 @@ func (d *GraphDiscoverer[K, V]) Discover(ctx context.Context) (retErr error) {
func (d *GraphDiscoverer[K, V]) discover(
ctx context.Context,
id K,
path map[K]bool,
) error {
// Early abort if context is cancelled.
select {
Expand All @@ -177,6 +181,11 @@ func (d *GraphDiscoverer[K, V]) discover(
default:
}

// if we have a match, error out
if path[id] {
return fmt.Errorf("cyclic dependency detected: vertex %v is already in the current discovery path", id)
}

// Setup done channel for this vertex.
ch := make(chan struct{})
defer close(ch)
Expand All @@ -193,6 +202,11 @@ func (d *GraphDiscoverer[K, V]) discover(
}
}

// add this vertex to the current discovery path
path[id] = true
// remove this vertex from the path when done (part of the backtracking)
defer delete(path, id)

// Add vertex in "discovering" state.
if err := d.graph.WithWriteLock(func(d *dag.DirectedAcyclicGraph[K]) error {
return d.AddVertex(id, map[string]any{
Expand Down Expand Up @@ -237,7 +251,10 @@ func (d *GraphDiscoverer[K, V]) discover(
errGroup, egctx := errgroup.WithContext(ctx)
for index, neighbor := range neighbors {
errGroup.Go(func() error {
if err := d.discover(egctx, neighbor); err != nil {
// create a copy for this particular neighbor
childPath := maps.Clone(path)

if err := d.discover(egctx, neighbor, childPath); err != nil {
return fmt.Errorf("failed to discover reference %v: %w", neighbor, err)
}
// Add edge from current vertex to neighbor.
Expand Down
45 changes: 45 additions & 0 deletions bindings/go/dag/sync/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,51 @@ func TestDAGDiscovery(t *testing.T) {
r.Contains([]DiscoveryState{DiscoveryStateError, DiscoveryStateUnknown}, cState, "expected vertex C to be in error state, but got %s", cState)
})

t.Run("graph discovery fails in case of cyclic dependency", func(t *testing.T) {
ctx := t.Context()
r := require.New(t)
// Emulate an invalid external dependency graph. Here, the edge C -> D
// exists, but D is not found in the graph.
// A
// / \
// B - B
Comment on lines +135 to +139

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment does not seem to describe a cyclic dependency.

graph := map[string][]string{
"A": {"B"},
"B": {"A"},
}
dag := NewGraphDiscoverer(&GraphDiscovererOptions[string, string]{
Roots: []string{"A"},
Resolver: ResolverFunc[string, string](func(ctx context.Context, key string) (value string, err error) {
if _, ok := graph[key]; !ok {
return "", fmt.Errorf("no node found with ID %s", key)
}
return key, nil
}),
Discoverer: DiscovererFunc[string, string](func(ctx context.Context, parent string) (children []string, err error) {
dep, ok := graph[parent]
if !ok {
return nil, fmt.Errorf("no node found with ID %s", parent)
}
var neighbors []string
for _, id := range dep {
neighbors = append(neighbors, id)
}
return neighbors, nil
}),
})

err := dag.Discover(ctx)
r.Error(err, "expected error due to missing node in the external graph, but got nil")

aState := dag.CurrentState("A")
r.Equal(DiscoveryStateError, aState, "expected vertex A to be in error state, but got %s", aState)

// because of discovers property to abort early, if 2 nodes on the layer are running in parallel,
// and one of them fails, the other one might be in an unknown state as it might not have yet been discovered.
bState := dag.CurrentState("B")
r.Contains([]DiscoveryState{DiscoveryStateError, DiscoveryStateUnknown}, bState, "expected vertex B to be in error state, but got %s", bState)
})

t.Run("graph discovery fails in discovery function", func(t *testing.T) {
ctx := t.Context()
r := require.New(t)
Expand Down
Loading