diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 6f82030e4ea..b6c6b5ac8bc 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -55,7 +55,7 @@ type Subscription struct { // Config for a fanout.EventHandler. type Config struct { Subscriptions []Subscription `json:"subscriptions"` - // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. AsyncHandler bool `json:"asyncHandler,omitempty"` } @@ -240,6 +240,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch reportArgs := channel.ReportArgs{} reportArgs.EventType = event.Type() reportArgs.Ns = ref.Namespace + additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace) dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders) return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) } @@ -302,7 +303,6 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription, if dispatchResult.err != nil { f.logger.Error("Fanout had an error", zap.Error(dispatchResult.err)) dispatchResultForFanout.err = dispatchResult.err - return dispatchResultForFanout } case <-time.After(f.timeout): f.logger.Error("Fanout timed out") diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 1e5309cac67..6d9a33b4945 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -225,7 +225,7 @@ func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.C HostName: imc.Status.Address.URL.Host, Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name), FanoutConfig: fanout.Config{ - AsyncHandler: true, + AsyncHandler: false, Subscriptions: subs, }, }, nil diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index 86532aa03c9..a6b2409c442 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -139,8 +139,8 @@ func TestBrokerConformance(t *testing.T) { // Install and wait for a Ready Broker. env.Prerequisite(ctx, t, broker.GoesReady("default", b.WithEnvConfig()...)) - env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...)) env.TestSet(ctx, t, broker.DataPlaneConformance("default")) + env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...)) } func TestBrokerDefaultDelivery(t *testing.T) {