.NET Core event bus, distributed transaction solution: CAP

database code

CREATE DATABASE OrderTest;
GO
USE OrderTest;
GO
CREATE TABLE Product(
	Id INT PRIMARY KEY IDENTITY,
	ProductName VARCHAR(30) NOT NULL,
	Price INT NOT NULL,
	Stock INT NOT NULL
);
CREATE TABLE [Order]
(
	Id INT PRIMARY KEY IDENTITY,
	ProductId INT NOT NULL,
	UserName VARCHAR(30) NOT NULL,
	Quantity INT NOT NULL
);
GO
INSERT INTO dbo.Product
(
    ProductName,
    Price,
    Stock
)
VALUES ('clothing',30,30 ),('shoe',20,20 ),('hat',10,10 ),('coat',25,30 )

Official website: https://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/

background

In the process of building distributed applications, the problem of distributed transactions will also be encountered, so CAP was born in this context.

https://github.com/dotnetcore

Introduction to CAP

Github: https://github.com/dotnetcore/CAP

Open Source License: MIT

CAP is an open source C# library that implements event bus and eventual consistency (distributed transaction) in distributed systems (SOA, MicroService). It is lightweight, high-performance, and easy to use.

You can easily introduce CAP in distributed systems based on .NET Core technologies, including but limited to ASP.NET Core and ASP.NET Core on .NET Framework.

CAP is provided as a NuGet package without any intrusions into the project, and you can still build distributed systems the way you like.

CAP has all the features of Event Bus, and CAP provides a more simplified way to handle pub/sub in EventBus.

CAP has the function of message persistence, that is, when your service restarts or goes down, it can ensure the reliability of the message.

CAP implements eventual consistency in distributed transactions, and you no longer have to deal with these trivial details.

CAP provides API services based on Microsoft DI, which can be seamlessly integrated with your ASP.NET Core system, and can be integrated with your business code to support transaction processing with strong consistency.

CAP is open source and free. CAP is open source based on the MIT protocol, you can use it in your private or commercial projects for free, and no one will charge you anything.

Getting Started

Currently, CAP also supports the use of RabbitMQ, Kafka, Azure Service Bus, etc. to send messages between the bottom layers. You do not need to have experience in using these message queues, and you can still easily integrate them into the project.

CAP currently supports projects using Sql Server, MySql, PostgreSql, MongoDB databases.

CAP supports both EntityFrameworkCore and ADO.NET projects, and you can choose different configuration methods according to your needs.

The following is an incomplete schematic diagram of CAP in the system:

The solid line part in the figure represents the user code, and the dotted line part represents the internal implementation of the CAP.

Next, let's take a look at how CAP is integrated into the project:

Step 1:

You can run the following command to install the CAP NuGet package:

PM> Install-Package DotNetCore.CAP

Depending on the underlying message queue, you can choose to import different packages:

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP currently supports projects using SQL Server, PostgreSql, MySql, MongoDB, you can choose to import different packages:

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB     //Requires MongoDB 4.0+ cluster

Step 2:

In the Startup.cs file, add the following configuration:

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllersWithViews();

    var sqlConnection = Configuration.GetConnectionString("OrderTestConnection");
    services.AddDbContext<OrderDbContext>(p=>p.UseSqlServer(sqlConnection));
    RabbitMQOptions rabbitOptions = new RabbitMQOptions();
    Configuration.GetSection("RabbitMQ").Bind(rabbitOptions);
    services.AddCap(p =>
                    {
                        p.UseEntityFramework<OrderDbContext>();
                        p.UseRabbitMQ(mq =>
                                      {
                                          mq.HostName = rabbitOptions.HostName;
                                          mq.VirtualHost = rabbitOptions.VirtualHost;
                                          mq.UserName = rabbitOptions.UserName;
                                          mq.Password = rabbitOptions.Password;
                                          mq.Port = rabbitOptions.Port;
                                      });
                        p.UseSqlServer(sqlConnection);
                    });

}

Step 3:

appsetting.json file configuration:

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
    "AllowedHosts": "*",
    "ConnectionStrings": {
        "OrderTestConnection": "server=.;uid=sa;pwd=123456;database=ordertest;"
    },
    "RabbitMQ": {
        "HostName": "172.16.20.157",
        "VirtualHost": "/",
        "UserName": "admin",
        "Password": "admin",
        "Port": "5672"
    } 
}

Post events/messages

Inject ICapPublisher in Controller and use ICapPublisher for message publishing:

public class PublishController : Controller
{
    private readonly ICapPublisher _capBus;
    private readonly OrderDbContext _context;
    public OrderController(ICapPublisher capBus, OrderDbContext context)
    {
        _capBus = capBus;
        _context = context;
    }


    public IActionResult CreateOrder()
    {
        return View();
    }

    [HttpPost]
    public IActionResult Submit(Order order)
    {
        using var tran = _context.Database.BeginTransaction(_capBus);
        try
        {
            _context.Order.Add(order);
            _context.SaveChanges();

            //Send a message
            _capBus.Publish("order.create",order);

            tran.Commit();
            return Redirect("/home/index");
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
            tran.Rollback();
            return BadRequest(e.Message);
        }
    }
}

Subscribe to events/messages

In Controller:
If it is in the Controller, directly add [CapSubscribe("")] to subscribe to related messages.

public class ConsumerController : Controller
{
    private readonly OrderDbContext _context;
    public ConsumerController( OrderDbContext context)
    {
        _context = context;
    }


    [CapSubscribe(name:"order.create")] //This should be consistent with the name in the publisher method
    public void OrderCreateConsumer(Order order)
    {
        //Reduce inventory
        var product = _context.Product.FirstOrDefault(p=>p.Id==order.ProductId);
        product.Stock -= order.Quantity;
        _context.SaveChanges();
    }
}

In xxxService:
If your method is not located in the Controller, then the class you subscribe to needs to inherit ICapSubscribe, and then add the [CapSubscribe("")] tag:

namespace xxx.Service
{
    public interface ISubscriberService
    {
    	public void CheckReceivedMessage(DateTime time);
    }
    
    
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
    	[CapSubscribe("xxxx.services.show.time")]
    	public void CheckReceivedMessage(DateTime time)
    	{
    		
    	}
    }
}

Then inject your ISubscriberService class in ConfigureServices() in Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

It's over, how about it, isn't it easy?

exception resolution

RabbitMQ : SubscriberNotFoundException #63

Inject your ISubscriberService class into es()

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

It's over, how about it, isn't it easy?

exception resolution

RabbitMQ : SubscriberNotFoundException #63

https://github.com/dotnetcore/CAP/issues/63

Tags: Database Distribution .NET

Posted by busyguy78 on Mon, 11 Jul 2022 06:32:41 +0930