Skip to content

Commit

Permalink
feat: support Error Handler
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Jan 11, 2025
1 parent 01c4e33 commit 3e3b4cb
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 7 deletions.
18 changes: 16 additions & 2 deletions pkg/util/source/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type catalog2deps func(*camel.RuntimeCatalog) []string

const (
defaultJSONDataFormat = "jackson"
kamelet = "kamelet"
)

var (
Expand All @@ -51,6 +52,7 @@ var (
jsonLibraryRegexp = regexp.MustCompile(`.*JsonLibrary\.Jackson.*`)
jsonLanguageRegexp = regexp.MustCompile(`.*\.json\(\).*`)
beanRegexp = regexp.MustCompile(`.*\.bean\(.*\).*`)
errorHandlerRegexp = regexp.MustCompile(`errorHandler\s*\(\s*deadLetterChannel\s*\(\s*["|']([a-zA-Z0-9-]+:[^"|']+)["|']\s*\).*`)
circuitBreakerRegexp = regexp.MustCompile(`.*\.circuitBreaker\(\).*`)
restConfigurationRegexp = regexp.MustCompile(`.*restConfiguration\(\).*`)
restRegexp = regexp.MustCompile(`.*rest\s*\([^)]*\).*`)
Expand Down Expand Up @@ -253,7 +255,7 @@ func (i *baseInspector) extract(source v1.SourceSpec, meta *Metadata,
meta.ToURIs = append(meta.ToURIs, to...)

for _, k := range kameletEips {
AddKamelet(meta, "kamelet:"+k)
AddKamelet(meta, kamelet+":"+k)
}

if err := i.discoverCapabilities(source, meta); err != nil {
Expand Down Expand Up @@ -350,6 +352,18 @@ func (i *baseInspector) discoverDependencies(source v1.SourceSpec, meta *Metadat
}
}

for _, match := range errorHandlerRegexp.FindAllStringSubmatch(source.Content, -1) {
if len(match) > 1 {
_, scheme := i.catalog.DecodeComponent(match[1])
if dfDep := i.catalog.GetArtifactByScheme(scheme.ID); dfDep != nil {
meta.AddDependency(dfDep.GetDependencyID())
}
if scheme.ID == kamelet {
AddKamelet(meta, match[1])
}
}
}

return nil
}

Expand Down Expand Up @@ -430,7 +444,7 @@ func (i *baseInspector) hasOnlyPassiveEndpoints(fromURIs []string) bool {

func (i *baseInspector) containsOnlyURIsIn(fromURI []string, allowed map[string]bool) bool {
for _, uri := range fromURI {
if uri == "kamelet:source" {
if uri == kamelet+":source" {
continue
}
prefix := i.getURIPrefix(uri)
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/source/inspector_java_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,24 @@ func TestJavaBeanDependencies(t *testing.T) {
assert.Contains(t, meta.Dependencies.List(), "camel:log")
})
}

func TestErrorHandlerDependencies(t *testing.T) {
inspector := newTestJavaSourceInspector(t)

sourceSpec := &v1.SourceSpec{
DataSpec: v1.DataSpec{
Name: "test.java",
Content: `
public void configure() throws Exception {
errorHandler(deadLetterChannel("seda:error"));
from("timer:foo").to("log:bar");
}
`,
},
}
assertExtract(t, inspector, sourceSpec.Content, func(meta *Metadata) {
assert.Contains(t, meta.Dependencies.List(), "camel:timer")
assert.Contains(t, meta.Dependencies.List(), "camel:seda")
assert.Contains(t, meta.Dependencies.List(), "camel:log")
})
}
16 changes: 14 additions & 2 deletions pkg/util/source/inspector_xml.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ func (i XMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error {
}
}
}
case "deadLetterChannel":
for _, a := range se.Attr {
if a.Name.Local == "deadLetterUri" {
_, scheme := i.catalog.DecodeComponent(a.Value)
if dfDep := i.catalog.GetArtifactByScheme(scheme.ID); dfDep != nil {
meta.AddDependency(dfDep.GetDependencyID())
}
if scheme.ID == kamelet {
AddKamelet(meta, a.Value)
}
}
}
case "from", "fromF":
for _, a := range se.Attr {
if a.Name.Local == URI {
Expand All @@ -89,10 +101,10 @@ func (i XMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error {
meta.ToURIs = append(meta.ToURIs, a.Value)
}
}
case "kamelet":
case kamelet:
for _, a := range se.Attr {
if a.Name.Local == "name" {
AddKamelet(meta, "kamelet:"+a.Value)
AddKamelet(meta, kamelet+":"+a.Value)
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/source/inspector_xml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,22 @@ func TestXMLBeanDependencies(t *testing.T) {
assert.Contains(t, meta.Dependencies.List(), "camel:log")
})
}

func TestXMLErrorHandlerDependencies(t *testing.T) {
xmlCode := `
<errorHandler>
<deadLetterChannel deadLetterUri="seda:dead">
<redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="250"/>
</deadLetterChannel>
</errorHandler>
<from uri="timer:foo"/>
<to uri="log:bar"></to>
`
inspector := newTestXMLInspector(t)

assertExtract(t, inspector, xmlCode, func(meta *Metadata) {
assert.Contains(t, meta.Dependencies.List(), "camel:timer")
assert.Contains(t, meta.Dependencies.List(), "camel:seda")
assert.Contains(t, meta.Dependencies.List(), "camel:log")
})
}
16 changes: 13 additions & 3 deletions pkg/util/source/inspector_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ func (i YAMLInspector) parseStep(key string, content interface{}, meta *Metadata
}
}
}
case "kamelet":
case kamelet:
switch t := content.(type) {
case string:
AddKamelet(meta, "kamelet:"+t)
AddKamelet(meta, kamelet+":"+t)
case map[interface{}]interface{}:
if name, ok := t["name"].(string); ok {
AddKamelet(meta, "kamelet:"+name)
AddKamelet(meta, kamelet+":"+name)
}
}
}
Expand Down Expand Up @@ -170,6 +170,16 @@ func (i YAMLInspector) parseStep(key string, content interface{}, meta *Metadata
return err
}
}
case "deadLetterUri":
if s, ok := v.(string); ok {
_, scheme := i.catalog.DecodeComponent(s)
if dfDep := i.catalog.GetArtifactByScheme(scheme.ID); dfDep != nil {
meta.AddDependency(dfDep.GetDependencyID())
}
if scheme.ID == kamelet {
AddKamelet(meta, s)
}
}
default:
// Always follow children because from/to uris can be nested
if ks, ok := k.(string); ok {
Expand Down
54 changes: 54 additions & 0 deletions pkg/util/source/inspector_yaml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,3 +776,57 @@ func TestYamlBeanDependencies(t *testing.T) {
assert.Contains(t, meta.Dependencies.List(), "camel:log")
})
}

func TestYAMLErrorHandler(t *testing.T) {
yamlContractFirst := `
- errorHandler:
deadLetterChannel:
deadLetterUri: kafka:my-dlc
- route:
id: route1
from:
uri: "timer:tick"
parameters:
period: "5000"
steps:
- setBody:
constant: "Hello Yaml !!!"
- transform:
simple: "${body.toUpperCase()}"
- to: "{{url}}"
`

inspector := newTestYAMLInspector(t)
t.Run("TestYAMLErrorHandler", func(t *testing.T) {
assertExtractYAML(t, inspector, yamlContractFirst, func(meta *Metadata) {
assert.Contains(t, meta.Dependencies.List(), "camel:kafka")
})
})
}

func TestYAMLErrorHandlerKamelet(t *testing.T) {
yamlContractFirst := `
- errorHandler:
deadLetterChannel:
deadLetterUri: kamelet:my-kamelet/errorHandler
- route:
id: route1
from:
uri: "timer:tick"
parameters:
period: "5000"
steps:
- setBody:
constant: "Hello Yaml !!!"
- transform:
simple: "${body.toUpperCase()}"
- to: "{{url}}"
`

inspector := newTestYAMLInspector(t)
t.Run("TestYAMLErrorHandler", func(t *testing.T) {
assertExtractYAML(t, inspector, yamlContractFirst, func(meta *Metadata) {
assert.Contains(t, meta.Kamelets, "my-kamelet/error")
})
})
}

0 comments on commit 3e3b4cb

Please sign in to comment.