问答平台(5),发送系统通知

问题背景

触发事件-图示

1
2
在项目中,有一些不需要实时执行但是非常频繁的操作。
为了提升网站的性能,可以使用异步消息的形式进行发送,使用消息队列服务器 kafka 来实现。

发送系统通知

评论,点赞,关注等事件频繁发生,但是发送系统通知不需要立刻执行。

触发事件

  • 评论后,发布通知
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // CommentController.java
    @Autowired
    private EventProducer eventProducer;

    @Autowired
    private DiscussPostService discussPostService;

    @RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
    comment.setUserId(hostHolder.getUser().getId());
    comment.setStatus(0);
    comment.setCreateTime(new Date());
    commentService.addComment(comment);

    // 触发评论事件
    Event event = new Event()
    .setTopic(TOPIC_COMMENT)
    .setUserId(hostHolder.getUser().getId())
    .setEntityType(comment.getEntityType())
    .setEntityId(comment.getEntityId())
    .setData("postId", discussPostId);
    if (comment.getEntityType() == ENTITY_TYPE_POST) {
    DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
    event.setEntityUserId(target.getUserId());
    } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
    Comment target = commentService.findCommentById(comment.getEntityId());
    event.setEntityUserId(target.getUserId());
    }
    eventProducer.fireEvent(event);

    return "redirect:/discuss/detail/" + discussPostId;
    }
  • 点赞后,发布通知
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // LikeController.java
    // 重构: like()
    @RequestMapping(path = "/like", method = RequestMethod.POST)
    @ResponseBody
    public String like(int entityType, int entityId, int entityUserId, int postId) {
    User user = hostHolder.getUser();

    // 点赞
    likeService.like(user.getId(), entityType, entityId, entityUserId);
    // 数量
    long likeCount = likeService.findEntityLikeCount(entityType, entityId);
    // 状态
    int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);
    // 返回的结果
    Map<String, Object> map = new HashMap<>();
    map.put("likeCount", likeCount);
    map.put("likeStatus", likeStatus);

    // 触发点赞事件
    if (likeStatus == 1) {
    Event event = new Event()
    .setTopic(TOPIC_LIKE)
    .setUserId(hostHolder.getUser().getId())
    .setEntityType(entityType)
    .setEntityId(entityId)
    .setEntityUserId(entityUserId)
    .setData("postId", postId);
    eventProducer.fireEvent(event);
    }

    return CommunityUtil.getJSONString(0, null, map);
    }
  • 关注后,发布通知
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // FollowController.java
    @RequestMapping(path = "/follow", method = RequestMethod.POST)
    @ResponseBody
    public String follow(int entityType, int entityId) {
    User user = hostHolder.getUser();

    followService.follow(user.getId(), entityType, entityId);

    // 触发关注事件
    Event event = new Event()
    .setTopic(TOPIC_FOLLOW)
    .setUserId(hostHolder.getUser().getId())
    .setEntityType(entityType)
    .setEntityId(entityId)
    .setEntityUserId(entityId);
    eventProducer.fireEvent(event);

    return CommunityUtil.getJSONString(0, "已关注!");
    }

处理事件

  • 封装事件对象
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    // 实体类
    // Event.java
    public class Event {

    private String topic;
    private int userId;
    private int entityType;
    private int entityId;
    private int entityUserId;
    private Map<String, Object> data = new HashMap<>();

    public String getTopic() {
    return topic;
    }

    public Event setTopic(String topic) {
    this.topic = topic;
    return this;
    }

    public int getUserId() {
    return userId;
    }

    public Event setUserId(int userId) {
    this.userId = userId;
    return this;
    }

    public int getEntityType() {
    return entityType;
    }

    public Event setEntityType(int entityType) {
    this.entityType = entityType;
    return this;
    }

    public int getEntityId() {
    return entityId;
    }

    public Event setEntityId(int entityId) {
    this.entityId = entityId;
    return this;
    }

    public int getEntityUserId() {
    return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
    this.entityUserId = entityUserId;
    return this;
    }

    public Map<String, Object> getData() {
    return data;
    }

    public Event setData(String key, Object value) {
    this.data.put(key, value);
    return this;
    }
    }
  • 开发事件的生产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 包: event
    // EventProducer.java
    @Component
    public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件
    public void fireEvent(Event event) {
    // 将事件发布到指定的主题
    kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
    }
  • 开发事件的消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // 包: event
    // EventConsumer.java
    @Component
    public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
    if (record == null || record.value() == null) {
    logger.error("消息的内容为空!");
    return;
    }

    Event event = JSONObject.parseObject(record.value().toString(), Event.class);
    if (event == null) {
    logger.error("消息格式错误!");
    return;
    }

    // 发送站内通知
    Message message = new Message();
    message.setFromId(SYSTEM_USER_ID);
    message.setToId(event.getEntityUserId());
    message.setConversationId(event.getTopic());
    message.setCreateTime(new Date());

    Map<String, Object> content = new HashMap<>();
    content.put("userId", event.getUserId());
    content.put("entityType", event.getEntityType());
    content.put("entityId", event.getEntityId());

    if (!event.getData().isEmpty()) {
    for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
    content.put(entry.getKey(), entry.getValue());
    }
    }

    message.setContent(JSONObject.toJSONString(content));
    messageService.addMessage(message);
    }
    }

页面

  • discuss-detail.html
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <!-- 内容 -->
    <!-- 作者 -->
    <li class="d-inline ml-2">
    <a href="javascript:;" th:onclick="|like(this,1,${post.id},${post.userId},${post.id});|" class="text-primary">
    <b th:text="${likeStatus==1?'已赞':'赞'}"></b> <i th:text="${likeCount}">11</i>
    </a>
    </li>
    <!-- 回帖列表 -->
    <!-- 第1条回帖 -->
    <li class="d-inline ml-2">
    <a href="javascript:;" th:onclick="|like(this,2,${cvo.comment.id},${cvo.comment.userId},${post.id});|" class="text-primary">
    <b th:text="${cvo.likeStatus==1?'已赞':'赞'}"></b> (<i th:text="${cvo.likeCount}">1</i>)
    </a>
    </li>
    <!-- 回复列表 -->
    <!-- 第1条回复 -->
    <li class="d-inline ml-2">
    <a href="javascript:;" th:onclick="|like(this,2,${rvo.reply.id},${rvo.reply.userId},${post.id});|" class="text-primary">
    <b th:text="${rvo.likeStatus==1?'已赞':'赞'}"></b> (<i th:text="${rvo.likeCount}">1</i>)
    </a>
    </li>
  • discuss.js: 重构,like()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    function like(btn, entityType, entityId, entityUserId, postId) {
    $.post(
    CONTEXT_PATH + "/like",
    {"entityType": entityType, "entityId": entityId, "entityUserId": entityUserId, "postId": postId},
    function (data) {
    data = $.parseJSON(data);
    if (data.code == 0) {
    $(btn).children("i").text(data.likeCount);
    $(btn).children("b").text(data.likeStatus == 1 ? '已赞' : '赞');
    } else {
    alert(data.msg);
    }
    }
    );
    }

异常

  • NullPointerException
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // ServiceLogAspect.java
    @Before("pointcut()")
    public void before(JoinPoint joinPoint) {
    // 用户[1,2,3,4],在[xxx],访问了[com.nowcoder.community.service.xxx]
    ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
    if (attributes == null) {
    return;
    }
    HttpServletRequest request = attributes.getRequest();
    String ip = request.getRemoteHost();
    String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    String target = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName();
    logger.info(String.format("用户[%s], 在[%s], 访问了[%s].", ip, now, target));
    }

结果展示

发送系统通知-图示

tips

1
2
Kafka 在 windows 启动后,可能出现 kafka-logs 文件夹被锁死,删除后重启即可。
Kafka 在 Linux 系统很稳定。

参考资料


问答平台(5),发送系统通知
https://lcf163.github.io/2020/06/03/问答平台(5),发送系统通知/
作者
乘风的小站
发布于
2020年6月3日
许可协议