牵着老婆满街逛

随便写点技术性的文章


Kratos微服务框架下实现CQRS架构模式

命令查询的责任分离Command Query Responsibility Segregation 通常被简化为 命令查询分离,即读写分离。

在特定的场景下,它可以提供更好的性能。但是,在强一致性方面,它并不能够保证。而且,还会带来认知负担。所以,实际运用上,需要谨慎。

什么是 CQRS

这个概念出自于 命令与查询分离(CQS, Command Query Separation),出自于1987 年 Bertrand Meyer 的 《面向对象软件构造》一书,其原始概念是我们可以把对象操作分为:命令(Command)和 查询(Query)两种形式。

命令查询的责任分离Command Query Responsibility Segregation (简称CQRS)模式是一种架构体系模式,能够使改变模型的状态的命令和模型状态的查询实现分离。

在单体应用时代,它是读写分离:

读写分离

而在微服务的时代,就变成了命令查询的责任分离:

命令查询的责任分离

读写分离解决了什么?

数据库的读写分离就是:将数据库分为了主从库,一个主库用于写数据,多个从库完成读数据的操作,主从库之间通过某种机制进行数据的同步。

大多数互联网业务,往往读多写少。这时候,数据库的读会首先成为数据库的瓶颈。这时,如果我们希望能够线性的提升数据库的读性能,消除读写锁冲突从而提升数据库的写性能,那么就可以使用读写分离的架构:主从,主主等。

MySQL用的最多的就是主从,主数据库通过BinLog同步到从数据库。这就产生了一个问题,数据不一致问题。如果写数据的压力很大,binlog就会拥塞,从库数据更新不及时,就会读到老旧的脏数据。所以这个方案局限了它的应用范围:只有对一致性要求不高的场景才好使。比如,日志查询,报表等。

使用 CQRS 有什么好处?

使用 CQRS 的主要好处包括:

  1. 灵活的扩展——读写数据存储可以根据需求独立扩展。
  2. 简单查询——由于读取和写入的数据模型是分开的,读取数据模型可以设计成避免复杂查询的方式。
  3. 提高性能——通过优化读取操作并将数据存储副本放置在不同的地理位置,读取繁重的操作可以显着提高性能。

使用 CQRS 的缺点是什么?

  1. 复杂性——事件驱动的系统构建和维护起来很复杂。
  2. 一致性——CQRS 和事件驱动架构旨在确保数据在所有涉及的系统中保持一致,但在消息失败的情况下,读取数据库将不同步。

实现CQRS

在这里讨论是物联网的时序数据的存取场景。

我们分为两个微服务:

日志查询服务(kratos.logger.service)

主要是开放了API用于查询数据库,获取日志数据。

日志写入服务(kratos.logger.job)

订阅Kafka的日志数据写入Topic,写入到时序数据库中去。

Docker部署开发服务器

TimeScaleDB

docker pull timescale/timescaledb:latest-pg14
docker pull timescale/timescaledb-postgis:latest-pg13
docker pull timescale/pg_prometheus:latest-pg11

docker run -itd \
    --name timescale-test \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=123456 \
    timescale/timescaledb-postgis:latest-pg13

Kafka

docker pull bitnami/kafka:latest
docker pull bitnami/zookeeper:latest
docker pull hlebalbau/kafka-manager:latest

docker run -itd \
    --name zookeeper-test \
    -p 2181:2181 \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest

docker run -itd \
    --name kafka-standalone \
    --link zookeeper-test \
    -p 9092:9092 \
    -v /home/data/kafka:/bitnami/kafka \
    -e KAFKA_BROKER_ID=1 \
    -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper-test:2181 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    --user root \
    bitnami/kafka:latest

docker run -itd \
     -p 9000:9000  \
     -e ZK_HOSTS="localhost:2181" \
     hlebalbau/kafka-manager:latest

Consul

docker pull bitnami/consul:latest

docker run -itd \
    --name consul-server-standalone \
    -p 8300:8300 \
    -p 8500:8500 \
    -p 8600:8600/udp \
    -e CONSUL_BIND_INTERFACE='eth0' \
    -e CONSUL_AGENT_MODE=server \
    -e CONSUL_ENABLE_UI=true \
    -e CONSUL_BOOTSTRAP_EXPECT=1 \
    -e CONSUL_CLIENT_LAN_ADDRESS=0.0.0.0 \
    bitnami/consul:latest

Jaeger

docker pull jaegertracing/all-in-one:latest

docker run -d \
    --name jaeger \
    -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
    -p 5775:5775/udp \
    -p 6831:6831/udp \
    -p 6832:6832/udp \
    -p 5778:5778 \
    -p 16686:16686 \
    -p 14268:14268 \
    -p 14250:14250 \
    -p 9411:9411 \
    jaegertracing/all-in-one:latest

测试

下载工具

进行测试

测试写

使用Offset Explorer 模拟设备,向 Topic logger.sensor.ts 发送JSON数据:

[{"ts": 1646409307, "sensor_id": 1, "temperature":30, "cpu":20}]

测试读

使用Postman向日志服务发起gRPC请求进行查询。

技术栈

实例代码

参考资料