PubSub
Request-Response场景
事件发起者明确需要知道请求的结果
sequenceDiagram
Caller->>+Server: send Request Body?
Server->>-Caller: reply Response
采用PubSub的方式来实现这种场景
方案1: 规则topic的方式
请求者发送请求到test主题,服务提供者消费到对应的请求后,处理相应的逻辑,并且将处理结果发送到test-response中
sequenceDiagram
Caller->>+MQ: send Request Body to test topic
MQ->>+Server: consume Request Body
Server->>Server: process the request and generate response
Server->>-MQ: send response to test-response
MQ->>-Caller: consume the response
方案2: 请求中带有response-topic的设定
请求者发送请求到test主题,并且带上response-topic=test2,服务提供者消费到对应的test的主题后,处理相应的逻辑,并且将结果写入到test2中。
sequenceDiagram
Caller->>+MQ: send Request Body(response-topic=test2) to test topic
MQ->>+Server: consume Request Body
Server->>Server: process the request and generate response
Server->>-MQ: send response to test2
MQ->>-Caller: consume the response
以下的示例采用JSON RPC的格式
登录Auth
- 鉴权请求
{
"jsonrpc": "2.0",
"method": "auth",
"params": {
"uid": "zhangchao",
"token": "123456"
},
"id": 1
}
- 鉴权返回
{
"jsonrpc": "2.0",
"result": {
"status": "true",
"token": "huanxin123456"
},
"id": 1
}
验证细节
- broadway 流式处理数据
- pubsub 返回数据给发起方
鉴权请求的时序图
sequenceDiagram
Caller->>+CallerService: request auth request
CallerService->>+MQ: send Auth(uid,pwd) to exim-auth
MQ->>+AuthService: consume Auth(uid,pwd) from exim-auth
AuthService->>AuthService: valid the Auth(uid,pwd)
AuthService->>-MQ: send AuthResult(success/fail) to exim-auth-response
MQ->>-CallerService: consume AuthResult(success/fail) from exim-auth-response
CallerService->>-Caller: consume the response
# 测试用例
result = Exim.PubSub.Request.auth("zhangchao", "123456")
# 由于演示的原因,所以没有进行分拆,CallerService和AuthService
# 1. 启动请求的kafka client
# 2. 启动response的消费请求的逻辑
# 3. 启动AuthService写入回应的kafka client
# 4. 启动CallerService消费请求结果的逻辑
# start request kafka client
def start_request() do
Enum.each(Application.get_env(:exim, :kafka_topics, []), fn topic ->
# start callerService kafka client
Exim.PubSub.Request.start_client(topic)
end)
end
# start kafka consume and response kafka client
def start_broadway() do
Enum.each(Application.get_env(:exim, :kafka_topics, []), fn topic ->
# add consumer for request
Exim.PubSub.PipelineManager.add_queue(topic)
# start authService kafka client
Exim.PubSub.Response.start_client(topic)
# add consumer for response
Exim.PubSub.PipelineManager.add_queue(Exim.PubSub.Response.response_topic(topic))
end)
end
# 请求的逻辑
# 1. 订阅请求ID
# 2. 发送请求到MQ
# 3. AuthService消费到请求
# 4. AuthService进行鉴权逻辑
# 5. AuthService写入鉴权结果
# 6. CallerService消费到鉴权结果
# 7. CallerService将结果通知给Caller
def auth(uid, token) do
auth_request = %{
method: "auth",
params: %{
uid: uid,
token: token
},
key: uid,
id: UUID.generate()
}
# sub the request id
PubSub.subscribe(Exim.PubSub, auth_request.id)
request(auth_request)
# wait for response
receive do
response ->
Logger.info("Received response: #{inspect(response)}")
response
end
end
# auth valid the auth request and give auth result to response topic
# 1. valid the token
# 2. send result to response topic
defp handle_message_internal(%{"method" => "auth"} = message) do
Logger.info("handle auth request, #{inspect(message)}")
response = message |> Map.put("topic", "exim-auth") |> Map.put("method", "result")
Exim.PubSub.Response.response(response)
end
# handle auth response
# send result to request process
defp handle_message_internal(%{"method" => "result", "id" => id} = message) do
Logger.info("handle auth response, #{inspect(message)}")
PubSub.broadcast(Exim.PubSub, id, message)
end
加入聊天室
TODO
发送消息(长连接)
-
基础参数的检测
比如消息是否在同一个appkey内,消息是否有from,有to字段等
-
业务合法性检查
- 客户业务逻辑判定
- 敏感词的检测
- 反垃圾的检测
- 群组聊天室等业务场景的检测
- 好友黑名单的检测
-
落盘消息
-
给消息发送者返回发送成功
-
路由消息的接收者,投递给消息的接收者
-
客户端确认消息接收成功
flowchart LR
Sender-->|基础检查|BasicCheck
BasicCheck-->|持久化数据|Persistent
Persistent-->|路由消息|RouterService
RouterService-->|推送消息|Receiver
异步MQ的topic的设计-服务级topic版本
该方案引入了AgentService,ValidService,ReplyService进行,服务中间采用的固定的topic队列进行消费
- 用户sender登录到AgentService
- 用户订阅topic用户级别,比如subscribe /sender/android
- sender发送消息到AgentService
- AgentService发送message到valid-msg-topic
- ValidService消费到valid-msg-topic的message数据
- ValidService完成消息合法性的检查
- ValidService将合法性检查的结果写入到valided-msg-topic
- ReplyService消费到valided-msg-topic的检查结果,发送并且publish /sender/android
- AgentService收到发送消息的检查结果,并把数据发送给sender客户端
flowchart LR
Sender-->|1 参数合法性检测|SenderAgentService
SenderAgentService-->|2 业务合法性检测(服务级队列,用户级分区)|ValidService
ValidService-->|3-1 发送结果(服务级对立,用户级分区)|ReplyService
ReplyService-->|4 返回发送者发送结果到Sender所在的节点|SenderAgentService
SenderAgentService-->|5 返回发送结果给客户端|Sender
ValidService-->|3-2 路由消息(服务级队列,用户级分区)|Persistent
ValidService-->|3-3 路由消息(服务级队列,用户级分区)|RouterService
RouterService-->|4 推送消息|AgentService
AgentService-->|5 发送消息到接收方|Receiver
sequenceDiagram
Sender->>+SenderAgentService: receive client message
SenderAgentService->>+MQ: pub message to (valid-msg-topic)
MQ->>+ValidService: consume message from valid-msg-topic
ValidService->>ValidService: valid message
ValidService->>+MQ: send valided msg to (valided-msg-topic)
MQ->>ReplyService: send reply to Sender
ReplyService->>SenderAgentService: route send_ack to ReceiveService
SenderAgentService->>-Sender: reply send_ack to Sender
MQ->>RouterService: route message to Receiver
RouterService->>AgentService: router msg to Receiver's AgentService
AgentService->>Receiver: send message to receiver
Receiver->>AgentService: ack message
异步MQ的topic的设计-用户级topic版本
该方案引入了AgentService,ValidService进行,主要的区别Agent能够直接消费到对应的数据
- 用户sender登录到AgentService
- 用户订阅topic用户级别,比如subscribe /sender/android
- sender发送消息到AgentService
- AgentService发送message到valid-msg-topic
- ValidService消费到valid-msg-topic的message数据
- ValidService完成消息合法性的检查
- ValidService将合法性检查的结果写入到valided-msg-topic
- ReplyService消费到valided-msg-topic的检查结果,发送并且publish /sender/android
- AgentService收到发送消息的检查结果,并把数据发送给sender客户端
flowchart LR
Sender-->|0 登录订阅用户级队列 /user/mobile |SenderAgentService
Sender-->|1 参数合法性检测|SenderAgentService
SenderAgentService-->|2 业务合法性检测(服务级队列,用户级分区)|ValidService
ValidService-->|3-1 发送结果(用户级队列 /user/mobile)|SenderAgentService
SenderAgentService-->|4 返回发送结果给客户端|Sender
ValidService-->|3-2 路由消息(服务级队列,用户级分区)|Persistent
ValidService-->|3-3 路由消息(用户级队列)|AgentService
AgentService-->|4 发送消息到接收方|Receiver
sequenceDiagram
Sender->>+SenderAgentService: login && subscribe /sender/mobile && /sender
SenderAgentService->>+MQ: pub message to (valid-msg-topic)
MQ->>+ValidService: consume message from valid-msg-topic
ValidService->>ValidService: valid message
ValidService->>+MQ: send valided msg to (/sender/mobile)
ValidService->>+MQ: route message to dest (/receiver || /receiver/mobile)
MQ->>SenderAgentService: send reply to Sender
SenderAgentService->>-Sender: reply send_ack to Sender
MQ->>AgentService: get msg to Receiver's AgentService
AgentService->>Receiver: send message to receiver
Receiver->>AgentService: ack message
REST发送消息
背景
当前发送消息,包括聊天室消息,群组消息,单人消息。
现状
flowchart LR
AppServer-->|0 发送系统消息 |Gateway
Gateway-->|1 验证token合法性| MessageService
MessageService-->|2 检查参数合法性 | MessageService
MessageService-->|3-1 返回消息发送成功|Gateway
Gateway-->|4-1 返回发送结果 | AppServer
MessageService -->|3-2 路由消息(此处过程同上面长连接过程) | RouterService
RouterService -->|4 发送消息到接收方|Receiver
sequenceDiagram
AppServer->>+Gateway: 0 发送系统消息
Gateway->>+MessageService: 1 验证token合法性
MessageService-->>MessageService: 2 检查参数合法性
MessageService-->-Gateway: 3-1 返回消息发送成功
Gateway-->-AppServer: 4-1 返回发送结果
MessageService ->>RouterService: 3-2 路由消息(此处过程同上面长连接过程)
RouterService ->>Receiver: 4 发送消息到接收方
IM服务接收到客户的请求后,进行接下来的操作
- 验证token的合法性,当前主要在gateway进行
- 参数合法性检查,确认必要的参数是否都有
- 返回发送结果给调用者,路径中包括gateway和AppServer
- 消息正常的后续投递过程
- 消息发送给消息的接收者
队列方案
IM服务接收到客户的请求后,进行接下来的操作
- 验证token的合法性,当前主要在gateway进行
- 参数合法性检查,确认必要的参数是否都有
- 消息写入到队列MQ(消息持久化)
- 返回发送结果给调用者,路径中包括gateway和AppServer
- 消息服消费到MQ中消息,进行后续的投递过程
- 消息服务发送给消息的接收者
flowchart LR
AppServer-->|0 发送系统消息 |Gateway
Gateway-->|1 验证token合法性| MessageService
MessageService-->|2 检查参数合法性 | MessageService
MessageService-->|3-1 消息写入队列|MQ
MessageService-->|3-2 返回消息发送成功|Gateway
Gateway-->|4-1 返回发送结果 | AppServer
MQ-->|4-2 消息读取队列|RouterService
RouterService-->|5 路由消息(此处过程同上面长连接过程) | AgentService
AgentService -->|6 发送消息到接收方|Receiver
sequenceDiagram
AppServer->>+Gateway: 0 发送系统消息
Gateway->>+MessageService: 1 验证token合法性
MessageService-->>MessageService: 2 检查参数合法性
MessageService->>MQ: 3-1 消息写入队列
MessageService-->>-Gateway: 3-2 返回消息发送成功
Gateway-->>-AppServer: 4-1 返回发送结果
MQ->>RouterService: 4-2 消息读取队列
RouterService->>AgentService: 5 路由消息(此处过程同上面长连接过程)
AgentService ->>Receiver: 6 发送消息到接收方
添加队列方案当前遗留问题
- 如何进行租户级别的隔离
- 队列的可靠性如何进行监控
- 延时会有所增加,是否在可接受范围内