Skip to content

Commit

Permalink
Don't use async handler (knative#7415) (#515)
Browse files Browse the repository at this point in the history
* Don't use async handler



* Fix Kn-Namespace header, potential race condition for sync receiver



* try refactoring test



* take 2 on test fix



---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Feb 2, 2024
1 parent 20f15dc commit 2303bba
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Subscription struct {
// Config for a fanout.EventHandler.
type Config struct {
Subscriptions []Subscription `json:"subscriptions"`
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
AsyncHandler bool `json:"asyncHandler,omitempty"`
}
Expand Down Expand Up @@ -240,6 +240,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch
reportArgs := channel.ReportArgs{}
reportArgs.EventType = event.Type()
reportArgs.Ns = ref.Namespace
additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace)
dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders)
return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
}
Expand Down Expand Up @@ -302,7 +303,6 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription,
if dispatchResult.err != nil {
f.logger.Error("Fanout had an error", zap.Error(dispatchResult.err))
dispatchResultForFanout.err = dispatchResult.err
return dispatchResultForFanout
}
case <-time.After(f.timeout):
f.logger.Error("Fanout timed out")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.C
HostName: imc.Status.Address.URL.Host,
Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name),
FanoutConfig: fanout.Config{
AsyncHandler: true,
AsyncHandler: false,
Subscriptions: subs,
},
}, nil
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func TestBrokerConformance(t *testing.T) {

// Install and wait for a Ready Broker.
env.Prerequisite(ctx, t, broker.GoesReady("default", b.WithEnvConfig()...))
env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...))
env.TestSet(ctx, t, broker.DataPlaneConformance("default"))
env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...))
}

func TestBrokerDefaultDelivery(t *testing.T) {
Expand Down

0 comments on commit 2303bba

Please sign in to comment.