Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

PubSub

docs/pubsub.livemd

PubSub

Request-Response场景

事件发起者明确需要知道请求的结果

sequenceDiagram
    Caller->>+Server: send Request Body?
    Server->>-Caller: reply Response

采用PubSub的方式来实现这种场景

方案1: 规则topic的方式

请求者发送请求到test主题,服务提供者消费到对应的请求后,处理相应的逻辑,并且将处理结果发送到test-response中

sequenceDiagram
    Caller->>+MQ: send Request Body to test topic
    MQ->>+Server: consume Request Body
    Server->>Server: process the request and generate response
    Server->>-MQ: send response to test-response
    MQ->>-Caller: consume the response

方案2: 请求中带有response-topic的设定

请求者发送请求到test主题,并且带上response-topic=test2,服务提供者消费到对应的test的主题后,处理相应的逻辑,并且将结果写入到test2中。

sequenceDiagram
    Caller->>+MQ: send Request Body(response-topic=test2) to test topic
    MQ->>+Server: consume Request Body
    Server->>Server: process the request and generate response
    Server->>-MQ: send response to test2
    MQ->>-Caller: consume the response

以下的示例采用JSON RPC的格式

登录Auth

  1. 鉴权请求
{
  "jsonrpc": "2.0",
  "method": "auth",
  "params": {
    "uid": "zhangchao",
    "token": "123456"
  },
  "id": 1
}
  1. 鉴权返回
{
  "jsonrpc": "2.0",
  "result": {
    "status": "true",
    "token": "huanxin123456"
  },
  "id": 1
}

验证细节

  • broadway 流式处理数据
  • pubsub 返回数据给发起方

鉴权请求的时序图

sequenceDiagram
    Caller->>+CallerService: request auth request 
    CallerService->>+MQ: send Auth(uid,pwd) to exim-auth
    MQ->>+AuthService: consume Auth(uid,pwd) from exim-auth
    AuthService->>AuthService: valid the Auth(uid,pwd)
    AuthService->>-MQ: send AuthResult(success/fail) to exim-auth-response
    MQ->>-CallerService: consume AuthResult(success/fail) from exim-auth-response
    CallerService->>-Caller: consume the response
# 测试用例

result = Exim.PubSub.Request.auth("zhangchao", "123456")
# 由于演示的原因,所以没有进行分拆,CallerService和AuthService
# 1. 启动请求的kafka client
# 2. 启动response的消费请求的逻辑
# 3. 启动AuthService写入回应的kafka client
# 4. 启动CallerService消费请求结果的逻辑

# start request kafka client
def start_request() do
  Enum.each(Application.get_env(:exim, :kafka_topics, []), fn topic ->
    # start callerService kafka client
    Exim.PubSub.Request.start_client(topic)
  end)
end

# start kafka consume and response kafka client
def start_broadway() do
  Enum.each(Application.get_env(:exim, :kafka_topics, []), fn topic ->
    # add consumer for request
    Exim.PubSub.PipelineManager.add_queue(topic)
    # start authService kafka client
    Exim.PubSub.Response.start_client(topic)
    # add consumer for response
    Exim.PubSub.PipelineManager.add_queue(Exim.PubSub.Response.response_topic(topic))
  end)
end
# 请求的逻辑
# 1. 订阅请求ID
# 2. 发送请求到MQ
# 3. AuthService消费到请求
# 4. AuthService进行鉴权逻辑
# 5. AuthService写入鉴权结果
# 6. CallerService消费到鉴权结果
# 7. CallerService将结果通知给Caller

def auth(uid, token) do
  auth_request = %{
    method: "auth",
    params: %{
      uid: uid,
      token: token
    },
    key: uid,
    id: UUID.generate()
  }

  # sub the request id
  PubSub.subscribe(Exim.PubSub, auth_request.id)
  request(auth_request)
  # wait for response
  receive do
    response ->
      Logger.info("Received response: #{inspect(response)}")
      response
  end
end


# auth valid the auth request and give auth result to response topic
# 1. valid the token
# 2. send result to response topic
defp handle_message_internal(%{"method" => "auth"} = message) do
  Logger.info("handle auth request, #{inspect(message)}")
  response = message |> Map.put("topic", "exim-auth") |> Map.put("method", "result")
  Exim.PubSub.Response.response(response)
end

# handle auth response
# send result to request process
defp handle_message_internal(%{"method" => "result", "id" => id} = message) do
  Logger.info("handle auth response, #{inspect(message)}")
  PubSub.broadcast(Exim.PubSub, id, message)
end

加入聊天室

TODO

发送消息(长连接)

  1. 基础参数的检测

    比如消息是否在同一个appkey内,消息是否有from,有to字段等

  2. 业务合法性检查

    • 客户业务逻辑判定
    • 敏感词的检测
    • 反垃圾的检测
    • 群组聊天室等业务场景的检测
    • 好友黑名单的检测
  3. 落盘消息

  4. 给消息发送者返回发送成功

  5. 路由消息的接收者,投递给消息的接收者

  6. 客户端确认消息接收成功

flowchart LR
    Sender-->|基础检查|BasicCheck
    BasicCheck-->|持久化数据|Persistent
    Persistent-->|路由消息|RouterService
    RouterService-->|推送消息|Receiver

异步MQ的topic的设计-服务级topic版本

该方案引入了AgentService,ValidService,ReplyService进行,服务中间采用的固定的topic队列进行消费

  1. 用户sender登录到AgentService
  2. 用户订阅topic用户级别,比如subscribe /sender/android
  3. sender发送消息到AgentService
  4. AgentService发送message到valid-msg-topic
  5. ValidService消费到valid-msg-topic的message数据
  6. ValidService完成消息合法性的检查
  7. ValidService将合法性检查的结果写入到valided-msg-topic
  8. ReplyService消费到valided-msg-topic的检查结果,发送并且publish /sender/android
  9. AgentService收到发送消息的检查结果,并把数据发送给sender客户端
flowchart LR
    Sender-->|1 参数合法性检测|SenderAgentService
    SenderAgentService-->|2 业务合法性检测(服务级队列,用户级分区)|ValidService
    ValidService-->|3-1 发送结果(服务级对立,用户级分区)|ReplyService
    ReplyService-->|4 返回发送者发送结果到Sender所在的节点|SenderAgentService
    SenderAgentService-->|5 返回发送结果给客户端|Sender
    ValidService-->|3-2 路由消息(服务级队列,用户级分区)|Persistent
    ValidService-->|3-3 路由消息(服务级队列,用户级分区)|RouterService
    RouterService-->|4 推送消息|AgentService
    AgentService-->|5 发送消息到接收方|Receiver
sequenceDiagram
    Sender->>+SenderAgentService: receive client message 
    SenderAgentService->>+MQ: pub message to (valid-msg-topic)
    MQ->>+ValidService: consume message from valid-msg-topic
    ValidService->>ValidService: valid message
    ValidService->>+MQ: send valided msg to (valided-msg-topic)
    MQ->>ReplyService: send reply to Sender
    ReplyService->>SenderAgentService: route send_ack to ReceiveService
    SenderAgentService->>-Sender: reply send_ack to Sender
    MQ->>RouterService: route message to Receiver
    RouterService->>AgentService: router msg to Receiver's AgentService
    AgentService->>Receiver: send message to receiver
    Receiver->>AgentService: ack message

异步MQ的topic的设计-用户级topic版本

该方案引入了AgentService,ValidService进行,主要的区别Agent能够直接消费到对应的数据

  1. 用户sender登录到AgentService
  2. 用户订阅topic用户级别,比如subscribe /sender/android
  3. sender发送消息到AgentService
  4. AgentService发送message到valid-msg-topic
  5. ValidService消费到valid-msg-topic的message数据
  6. ValidService完成消息合法性的检查
  7. ValidService将合法性检查的结果写入到valided-msg-topic
  8. ReplyService消费到valided-msg-topic的检查结果,发送并且publish /sender/android
  9. AgentService收到发送消息的检查结果,并把数据发送给sender客户端
flowchart LR
    Sender-->|0 登录订阅用户级队列 /user/mobile |SenderAgentService
    Sender-->|1 参数合法性检测|SenderAgentService
    SenderAgentService-->|2 业务合法性检测(服务级队列,用户级分区)|ValidService
    ValidService-->|3-1 发送结果(用户级队列 /user/mobile)|SenderAgentService
    SenderAgentService-->|4 返回发送结果给客户端|Sender
    ValidService-->|3-2 路由消息(服务级队列,用户级分区)|Persistent
    ValidService-->|3-3 路由消息(用户级队列)|AgentService
    AgentService-->|4 发送消息到接收方|Receiver
sequenceDiagram
    Sender->>+SenderAgentService: login && subscribe /sender/mobile && /sender
    SenderAgentService->>+MQ: pub message to (valid-msg-topic)
    MQ->>+ValidService: consume message from valid-msg-topic
    ValidService->>ValidService: valid message
    ValidService->>+MQ: send valided msg to (/sender/mobile)
    ValidService->>+MQ: route message to dest (/receiver || /receiver/mobile)
    MQ->>SenderAgentService: send reply to Sender
    SenderAgentService->>-Sender: reply send_ack to Sender
    MQ->>AgentService: get msg to Receiver's AgentService
    AgentService->>Receiver: send message to receiver
    Receiver->>AgentService: ack message

REST发送消息

背景

当前发送消息,包括聊天室消息,群组消息,单人消息。

现状

flowchart LR
    AppServer-->|0 发送系统消息 |Gateway
    Gateway-->|1 验证token合法性| MessageService
    MessageService-->|2 检查参数合法性 | MessageService
    MessageService-->|3-1 返回消息发送成功|Gateway
    Gateway-->|4-1 返回发送结果 | AppServer
    MessageService -->|3-2 路由消息(此处过程同上面长连接过程) | RouterService
    RouterService -->|4 发送消息到接收方|Receiver
sequenceDiagram
    AppServer->>+Gateway: 0 发送系统消息
    Gateway->>+MessageService: 1 验证token合法性
    MessageService-->>MessageService: 2 检查参数合法性
    MessageService-->-Gateway: 3-1 返回消息发送成功
    Gateway-->-AppServer: 4-1 返回发送结果
    MessageService ->>RouterService: 3-2 路由消息(此处过程同上面长连接过程)
    RouterService ->>Receiver: 4 发送消息到接收方

IM服务接收到客户的请求后,进行接下来的操作

  1. 验证token的合法性,当前主要在gateway进行
  2. 参数合法性检查,确认必要的参数是否都有
  3. 返回发送结果给调用者,路径中包括gateway和AppServer
  4. 消息正常的后续投递过程
  5. 消息发送给消息的接收者

队列方案

IM服务接收到客户的请求后,进行接下来的操作

  1. 验证token的合法性,当前主要在gateway进行
  2. 参数合法性检查,确认必要的参数是否都有
  3. 消息写入到队列MQ(消息持久化)
  4. 返回发送结果给调用者,路径中包括gateway和AppServer
  5. 消息服消费到MQ中消息,进行后续的投递过程
  6. 消息服务发送给消息的接收者
flowchart LR
    AppServer-->|0 发送系统消息 |Gateway
    Gateway-->|1 验证token合法性| MessageService
    MessageService-->|2 检查参数合法性 | MessageService
    MessageService-->|3-1 消息写入队列|MQ
    MessageService-->|3-2 返回消息发送成功|Gateway
    Gateway-->|4-1 返回发送结果 | AppServer
    MQ-->|4-2 消息读取队列|RouterService
    RouterService-->|5 路由消息(此处过程同上面长连接过程) | AgentService
    AgentService -->|6 发送消息到接收方|Receiver
sequenceDiagram
    AppServer->>+Gateway: 0 发送系统消息
    Gateway->>+MessageService: 1 验证token合法性
    MessageService-->>MessageService: 2 检查参数合法性
    MessageService->>MQ: 3-1 消息写入队列
    MessageService-->>-Gateway: 3-2 返回消息发送成功
    Gateway-->>-AppServer: 4-1 返回发送结果
    MQ->>RouterService: 4-2 消息读取队列
    RouterService->>AgentService: 5 路由消息(此处过程同上面长连接过程)
    AgentService ->>Receiver: 6 发送消息到接收方

添加队列方案当前遗留问题

  1. 如何进行租户级别的隔离
  2. 队列的可靠性如何进行监控
  3. 延时会有所增加,是否在可接受范围内