Skip to content
/ Joker Public
forked from tomasfabian/Joker

Reactive data changes from SQL server to .NET clients. SqlTableDependency extensions, Joker.OData, Joker.Redis, Joker.MVVM and ksqlDB LINQ provider

License

Notifications You must be signed in to change notification settings

pailyn/Joker

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Data change notifications from Sql Server via SqlTableDependency, OData and Redis to different .NET clients (WinUI3 - UWP and Win32 apps, WPF, Blazor Wasm, etc). Blazor Wasm notifications are redirected with SignalR.

Joker in action

Kafka.DotNet.ksqlDB push queries LINQ provider

Kafka.DotNet.ksqlDB package generates ksql queries from your C# linq queries. For more information check the Wiki. You can filter, project, limit etc your push notifications server side with ksqlDB push queries

This project was moved to a separate (repository Kafka.DotNet.ksqlDB)[https://github.com/tomasfabian/Kafka.DotNet.ksqlDB]

Install-Package Kafka.DotNet.ksqlDB
await using var context = new KSqlDBContext(@"http:\\localhost:8088");

using var disposable = context.CreateQueryStream<Tweet>()
  .Where(p => p.Message != "Hello world" || p.Id == 1)
  .Where(p => p.RowTime >= 1510923225000)
  .Select(l => new { l.Id, l.Message, l.RowTime })
  .Take(2)     
  .ToObservable() // client side processing starts here lazily after subscription
  .Delay(TimeSpan.FromSeconds(2)) // IObservable extensions
  .ObserveOn(TaskPoolScheduler.Default)
  .Subscribe(tweetMessage =>
  {
    Console.WriteLine($"{nameof(Tweet)}: {tweetMessage.Id} - {tweetMessage.Message}");
    Console.WriteLine();
  }, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));

CDC - Push notifications from Sql Server tables with Kafka

Monitor Sql Server tables for changes and forward them to the appropriate Kafka topics. You can consume (react to) these row-level table changes (CDC - Change Data Capture) from Sql Server databases with Kafka.DotNet.SqlServer package together with the Debezium connector streaming platform.

Nuget

Install-Package Kafka.DotNet.SqlServer -Version 0.2.0-rc.2

Kafka.DotNet.SqlServer WIKI

Full example is available in Blazor example - Kafka.DotNet.InsideOut.sln:

using System;
using System.Threading;
using System.Threading.Tasks;
using Kafka.DotNet.ksqlDB.KSql.Linq;
using Kafka.DotNet.ksqlDB.KSql.Query.Context;
using Kafka.DotNet.ksqlDB.KSql.Query.Options;
using Kafka.DotNet.SqlServer.Cdc;
using Kafka.DotNet.SqlServer.Cdc.Extensions;

class Program
{
  static string connectionString = @"Server=127.0.0.1,1433;User Id = SA;Password=<YourNewStrong@Passw0rd>;Initial Catalog = Sensors;MultipleActiveResultSets=true";

  static string bootstrapServers = "localhost:29092";
  static string KsqlDbUrl => @"http:\\localhost:8088";

  static string tableName = "Sensors";
  static string schemaName = "dbo";

  private static ISqlServerCdcClient CdcClient { get; set; }

  static async Task Main(string[] args)
  {
    CdcClient = new CdcClient(connectionString);

    await CreateSensorsCdcStreamAsync();

    await TryEnableCdcAsync();

    await CreateConnectorAsync();

    await using var context = new KSqlDBContext(KsqlDbUrl);

    var semaphoreSlim = new SemaphoreSlim(0, 1);

    var cdcSubscription = context.CreateQuery<RawDatabaseChangeObject<IoTSensor>>("sqlserversensors")
      .WithOffsetResetPolicy(AutoOffsetReset.Latest)
      .Take(5)
      .ToObservable()
      .Subscribe(cdc =>
        {
          var operationType = cdc.OperationType;
          Console.WriteLine(operationType);

          switch (operationType)
          {
            case ChangeDataCaptureType.Created:
              Console.WriteLine($"Value: {cdc.EntityAfter.Value}");
              break;
            case ChangeDataCaptureType.Updated:

              Console.WriteLine($"Value before: {cdc.EntityBefore.Value}");
              Console.WriteLine($"Value after: {cdc.EntityAfter.Value}");
              break;
            case ChangeDataCaptureType.Deleted:
              Console.WriteLine($"Value: {cdc.EntityBefore.Value}");
              break;
          }
        }, onError: error =>
        {
          semaphoreSlim.Release();

          Console.WriteLine($"Exception: {error.Message}");
        },
        onCompleted: () =>
        {
          semaphoreSlim.Release();
          Console.WriteLine("Completed");
        });


    await semaphoreSlim.WaitAsync();

    using (cdcSubscription)
    {
    }
  }
}

public record IoTSensor
{
	public string SensorId { get; set; }
	public int Value { get; set; }
}

Consuming table change notifications directly from a Kakka topic:

using System;
using System.Linq;
using System.Threading.Tasks;
using Blazor.Sample.Data.Sensors;
using Confluent.Kafka;
using Kafka.DotNet.InsideOut.Consumer;
using Kafka.DotNet.SqlServer.Cdc;

async Task ConsumeFromTopicAsync()
{
  string bootstrapServers = "localhost:29092";

  var consumerConfig = new ConsumerConfig
  {
    BootstrapServers = bootstrapServers,
    GroupId = "Client-01",
    AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest
  };

  var kafkaConsumer =
    new KafkaConsumer<string, DatabaseChangeObject<IoTSensor>>("sqlserver2019.dbo.Sensors", consumerConfig);

  var dataChanges = kafkaConsumer.ConnectToTopic().ToAsyncEnumerable().Where(c => c.Message.Value.OperationType != ChangeDataCaptureType.Read).Take(2);
	
  await foreach (var consumeResult in dataChanges)
  {
    var message = consumeResult.Message;
    var changeNotification = message.Value; 

    Console.WriteLine(changeNotification.OperationType);
    Console.WriteLine(changeNotification.Before?.Value);
    Console.WriteLine(changeNotification.After?.Value);
  }

  using (kafkaConsumer)
  {		
  }
}

Kafka stream processing

Kafka.DotNet.InsideOut is a client API for producing and consuming kafka topics and ksqlDB push queries and views generated with Kafka.DotNet.ksqlDB

Install-Package Kafka.DotNet.ksqlDB
using System.Threading.Tasks;
using Kafka.DotNet.ksqlDB.KSql.Linq.Statements;
using Kafka.DotNet.ksqlDB.KSql.Query.Context;
using Kafka.DotNet.ksqlDB.KSql.RestApi.Extensions;

private string KsqlDbUrl => "http://localhost:8088";

private async Task CreateOrReplaceMaterializedTableAsync()
{
  string ksqlDbUrl = Configuration[ConfigKeys.KSqlDb_Url];

  await using var context = new KSqlDBContext(ksqlDbUrl);

  var statement = context.CreateOrReplaceTableStatement(tableName: "SensorsTable")
    .As<IoTSensor>(TopicNames.IotSensors)
    .Where(c => c.SensorId != "Sensor-5")
    .GroupBy(c => c.SensorId)
    .Select(c => new {SensorId = c.Key, Count = c.Count(), AvgValue = c.Avg(a => a.Value) });

  var httpResponseMessage = await statement.ExecuteStatementAsync();

  if (httpResponseMessage.IsSuccessStatusCode)
  {
    var statementResponses = httpResponseMessage.ToStatementResponses();
  }
  else
  {
    var statementResponse = httpResponseMessage.ToStatementResponse();
  }
}
public class SensorsTableConsumer : KafkaConsumer<string, IoTSensorStats>
{
  public SensorsTableConsumer(ConsumerConfig consumerConfig)
    : base("SENSORSTABLE", consumerConfig)
  {
  }
}

public record IoTSensorStats
{
  public string SensorId { get; set; }
  public double AvgValue { get; set; }
  public int Count { get; set; }
}
Install-Package Kafka.DotNet.InsideOut -Version 1.0.0
Install-Package System.Interactive.Async -Version 5.0.0
using System;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Confluent.Kafka;
using Kafka.DotNet.InsideOut.Consumer;

const string bootstrapServers = "localhost:29092";

static async Task Main(string[] args)
{
  var consumerConfig = new ConsumerConfig
                       {
                         BootstrapServers = bootstrapServers,
                         GroupId = "Client-01",
                         AutoOffsetReset = AutoOffsetReset.Latest
                       };

  var kafkaConsumer = new KafkaConsumer<string, IoTSensorStats>("IoTSensors", consumerConfig);

  await foreach (var consumeResult in kafkaConsumer.ConnectToTopic().ToAsyncEnumerable().Take(10))
  {
    Console.WriteLine(consumeResult.Message);
  }

  using (kafkaConsumer)
  { }
}

Blazor server side example - Kafka.DotNet.InsideOut.sln

Joker Model-View-ViewModel:

Reactive view models for data changes

Install-Package Joker.MVVM

Joker OData:

Plumbing code for OData web services. Support for batching and end points. Please help out the community by sharing your suggestions and code improvements:

Preview:

Redis TableDependency status notifier Sql server data changes refresher via Redis with End to end reconnections

SqlTableDependency.Extensions

If sharing increases coupling, should we share at all? Is it time for decoupling?

I don't think so.

Please use data streaming and process it with the help of reactive programming and event driven paradigms.

Install:

https://www.nuget.org/packages/SqlTableDependency.Extensions/

Install-Package SqlTableDependency.Extensions

or

dotnet add package SqlTableDependency.Extensions --version 3.0.0

See:

Following package is based on christiandelbianco's SqlTableDependency: https://github.com/christiandelbianco/monitor-table-change-with-sqltabledependency

SqlTableDependency.Extension.SqlTableDependencyProvider provides periodic reconnections in case of any error, like lost connection etc.

Currently there are 3 LifetimeScopes:

ConnectionScope:

In case that the connection is lost, database objects will be deleted after timeout period or during disposal. During all reconnections the database objects are newly (re)created.

ApplicationScope:

In case that the connection is lost, database objects will be deleted only after timeout period. After reconnection the database objects are recreated in case that the conversation handle does not exist anymore. Otherwise the database objects are preserved and reused. If the application was closed the conversation will not continue after app restart. You shouldn't lost data changes within the timeout period. The messages will be delivered after the reconnection.

UniqueScope:

In case that the connection is lost, database objects will be deleted only after timeout period. After reconnection the database objects are recreated only in case, that the conversation handle does not exist anymore. Otherwise the database objects are preserved and reused. If the application was closed and the conversation was not cleaned it will be reused after app restarts. You shouldn't lost data changes within the timeout period. The messages will be delivered after the reconnection.

Wiki Samples

Docker for external dependencies:

MS SQL Server 2017:

docker run --name sql -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=<YourNewStrong@Passw0rd>" -p 1401:1433 -d mcr.microsoft.com/mssql/server:2017-latest

Redis latest:

docker run --name redis-server -p 6379:6379 -d redis

Examples Entity Framework migrations:

Package Manager Console (Default project => Examples\Samples.Data):

Update-Database -ConnectionString "Server=127.0.0.1,1401;User Id = SA;Password=<YourNewStrong@Passw0rd>;Initial Catalog = Test;" -ConnectionProviderName "System.Data.SqlClient" -ProjectName Sample.Data -verbose

Basic usage:

Enable Service Broker in MS SQL SERVER

ALTER DATABASE [DatabaseName]
    SET ENABLE_BROKER WITH ROLLBACK IMMEDIATE;

// C#

  public class Product
  {
      public int Id { get; set; }
      public string Name { get; set; }
  } 
  
  using SqlTableDependency.Extensions;
  using SqlTableDependency.Extensions.Enums;
  
  internal class ProductsSqlTableDependencyProvider : SqlTableDependencyProvider<Product>
  {
    private readonly ILogger logger;

    internal ProductsSqlTableDependencyProvider(ConnectionStringSettings connectionStringSettings, IScheduler scheduler, ILogger logger)
      : base(connectionStringSettings, scheduler, LifetimeScope.UniqueScope)
    {
      this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    internal ProductsSqlTableDependencyProvider(string connectionString, IScheduler scheduler, ILogger logger)
      : base(connectionString, scheduler, LifetimeScope.UniqueScope)
    {
      this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    protected override ModelToTableMapper<Product> OnInitializeMapper(ModelToTableMapper<Product> modelToTableMapper)
    {
      modelToTableMapper.AddMapping(c => c.Id, nameof(Product.Id));

      return modelToTableMapper;
    }

    protected override void OnInserted(Product product)
    {
      base.OnInserted(product);

      logger.Trace("Entity was added");

      LogChangeInfo(product);
    }

    protected override void OnUpdated(Product product, Product oldValues)
    {
      base.OnUpdated(entity, oldValues);

      logger.Trace("Entity was modified");

      LogChangeInfo(product);
    }

    protected override void OnDeleted(Product product)
    {
      base.OnDeleted(product);

      logger.Trace("Entity was deleted");

      LogChangeInfo(product);
    }

    private void LogChangeInfo(Product product)
    {
      Console.WriteLine(Environment.NewLine);

      Console.WriteLine("Id: " + product.Id);
      Console.WriteLine("Name: " + product.Name);

      Console.WriteLine("#####");
      Console.WriteLine(Environment.NewLine);
    }
    
    protected override void OnError(Exception exception)
    {
      logSource.Error(exception);
    }
  }

//Program.cs

using System.Configuration;
using System.Reactive.Concurrency;

namespace SqlTableDependency.Extensions.Sample
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionString = ConfigurationManager.ConnectionStrings["FargoEntities"].ConnectionString;
      
      using var productsProvider = new ProductsSqlTableDependencyProvider(connectionString, ThreadPoolScheduler.Instance, new ConsoleLogger());
      productsProvider.SubscribeToEntityChanges();
      
      Console.ReadKey();
    }
  }
}

Joker.Redis

SqlServer PubSub notifications via Redis and SqlTableDependencyProvider extension

Install-Package Joker.Redis

Download and run redis-server (https://redis.io/download) or use Docker (see above).

Server side:

public class ProductSqlTableDependencyRedisProvider : SqlTableDependencyRedisProvider<Product>
{
  public ProductSqlTableDependencyRedisProvider(ISqlTableDependencyProvider<Product> sqlTableDependencyProvider, IRedisPublisher redisPublisher) 
    : base(sqlTableDependencyProvider, redisPublisher)
  {
  }
}
string redisUrl = ConfigurationManager.AppSettings["RedisUrl"]; //localhost

var redisPublisher = new RedisPublisher(redisUrl);
await redisPublisher.PublishAsync("messages", "hello");

using var productsProvider = new ProductsSqlTableDependencyProvider(connectionString, ThreadPoolScheduler.Instance, new ConsoleLogger());

using var productSqlTableDependencyRedisProvider = new ProductSqlTableDependencyRedisProvider(productsProvider, redisPublisher, ThreadPoolScheduler.Instance)
  .StartPublishing();

Client side:

private static async Task<RedisSubscriber> CreateRedisSubscriber(string redisUrl)
{
  var redisSubscriber = new RedisSubscriber(redisUrl);

  await redisSubscriber.Subscribe(channelMessage => { Console.WriteLine($"OnNext({channelMessage.Message})"); },
  "messages");

  await redisSubscriber.Subscribe(channelMessage =>
  {
    var recordChange = JsonConvert.DeserializeObject<RecordChangedNotification<Product>>(channelMessage.Message);
    Console.WriteLine($"OnNext Product changed({recordChange.ChangeType})");
    Console.WriteLine($"OnNext Product changed({recordChange.Entity.Id})");
  }, nameof(Product) + "-Changes");

  await redisSubscriber.Subscribe(channelMessage =>
  {
    var tableDependencyStatus = JsonConvert.DeserializeObject<TableDependencyStatus>(channelMessage.Message);
    Console.WriteLine($"OnNext tableDependencyStatus changed({tableDependencyStatus})");
  }, nameof(Product) + "-Status");

  redisSubscriber.WhenIsConnectedChanges.Subscribe(c => Console.WriteLine($"REDIS is connected: {c}"));

  return redisSubscriber;
}

How to put it all together

Try out samples

    private static void CreateReactiveProductsViewModel()
    {
      var reactiveData = new ReactiveData<Product>();
      var redisUrl = ConfigurationManager.AppSettings["RedisUrl"];
      using var entitiesSubscriber = new DomainEntitiesSubscriber<Product>(new RedisSubscriber(redisUrl), reactiveData);

      string connectionString = ConfigurationManager.ConnectionStrings["FargoEntities"].ConnectionString;

      var reactiveProductsViewModel = new ReactiveProductsViewModel(new SampleDbContext(connectionString),
        reactiveData, new WpfSchedulersFactory());

      reactiveProductsViewModel.SubscribeToDataChanges();

      reactiveProductsViewModel.Dispose();
    }

"Buy Me A Coffee"

About

Reactive data changes from SQL server to .NET clients. SqlTableDependency extensions, Joker.OData, Joker.Redis, Joker.MVVM and ksqlDB LINQ provider

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • C# 99.9%
  • Dockerfile 0.1%