Commit 240f7243 by 段启岩

更新消息队列监听器架构

parent cb508aec
package cn.meteor.beyondclouds.core.listener;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeType;
import cn.meteor.beyondclouds.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.Optional;
/**
* 数据更新监听器
* 监听本系统所有的数据更新 操作:更新操作包括
* @author meteor
*/
@Slf4j
public class DataItemChangeListener implements TopicListener {
@Override
@KafkaListener(topics = "${beyondclouds.kafka.topics.search-item-update}")
public final void onMessage(ConsumerRecord<?, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
DataItemChangeMessage dataItemChangeMessage;
try {
dataItemChangeMessage = JsonUtils.toBean(kafkaMessage.get(), DataItemChangeMessage.class);
log.debug("接收到kafka消息:{}", dataItemChangeMessage.toString());
// 调用对应的消息处理函数
DataItemChangeType changeType = dataItemChangeMessage.getChangeType();
switch (changeType) {
case ADD:
onDataItemAdd(dataItemChangeMessage);
break;
case DELETE:
onDataItemDelete(dataItemChangeMessage);
break;
case UPDATE:
onDataItemUpdate(dataItemChangeMessage);
break;
case USER_NICK_UPDATE:
onUserNickUpdate(dataItemChangeMessage);
break;
case USER_AVATAR_UPDATE:
onUserAvatarUpdate(dataItemChangeMessage);
break;
}
} catch (Exception e) {
e.printStackTrace();
log.error("DataItemUpdateMessage consume failed:{}", e.getMessage());
}
}
}
/**
* 有新数据添加到数据库
* @param dataItemChangeMessage
*/
public void onDataItemAdd(DataItemChangeMessage dataItemChangeMessage) {
}
/**
* 有数据从数据库删除
* @param dataItemChangeMessage
*/
public void onDataItemDelete(DataItemChangeMessage dataItemChangeMessage) {
}
/**
* 数据库里面的数据更新
* @param dataItemChangeMessage
*/
public void onDataItemUpdate(DataItemChangeMessage dataItemChangeMessage) {
}
/**
* 用户昵称更新
* @param dataItemChangeMessage
*/
public void onUserNickUpdate(DataItemChangeMessage dataItemChangeMessage) {
}
/**
* 用户头像更新
* @param dataItemChangeMessage
*/
public void onUserAvatarUpdate(DataItemChangeMessage dataItemChangeMessage) {
}
}
package cn.meteor.beyondclouds.core.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Topic 消息监听器
* 监听Kafka队列里面的消息
* @author meteor
*/
public interface TopicListener {
/**
* 订阅的topic有新的消息
* @param record
*/
void onMessage(ConsumerRecord<?, String> record);
}
package cn.meteor.beyondclouds.core.listener;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.io.Serializable;
import java.util.Optional;
/**
* @author meteor
*/
@Slf4j
public class TopicMessageListener {
@KafkaListener(topics = "${beyondclouds.kafka.topics.search-item-update}")
public final void itemUpdate(ConsumerRecord<?, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
DataItemUpdateMessage itemUpdateMessage;
try {
itemUpdateMessage = JsonUtils.toBean(kafkaMessage.get(), DataItemUpdateMessage.class);
log.debug("接收到kafka消息:{}", itemUpdateMessage.toString());
// 调用消息处理函数
onItemUpdate(itemUpdateMessage);
} catch (Exception e) {
e.printStackTrace();
log.error("DataItemUpdateMessage consume failed:{}", e.getMessage());
}
}
}
/**
* 数据更新事件
* @param dataItemUpdateMessage
*/
public void onItemUpdate(DataItemUpdateMessage dataItemUpdateMessage) throws Exception {
}
}
......@@ -12,7 +12,7 @@ import cn.meteor.beyondclouds.modules.blog.exception.BlogCategoryServiceExceptio
import cn.meteor.beyondclouds.modules.blog.exception.BlogServiceException;
import cn.meteor.beyondclouds.modules.blog.mapper.BlogMapper;
import cn.meteor.beyondclouds.modules.blog.service.*;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.tag.entity.Tag;
......@@ -154,7 +154,7 @@ public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IB
// 5.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.addMessage(SearchItemType.BLOG, blog.getBlogId())
DataItemChangeMessage.addMessage(SearchItemType.BLOG, blog.getBlogId())
);
}
......@@ -204,7 +204,7 @@ public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IB
// 5.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.deleteMessage(SearchItemType.BLOG, blog.getBlogId())
DataItemChangeMessage.deleteMessage(SearchItemType.BLOG, blog.getBlogId())
);
}
......@@ -379,7 +379,7 @@ public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IB
// 5.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.updateMessage(SearchItemType.BLOG, blog.getBlogId())
DataItemChangeMessage.updateMessage(SearchItemType.BLOG, blog.getBlogId())
);
}
......
......@@ -12,7 +12,7 @@ import cn.meteor.beyondclouds.modules.project.mapper.ProjectMapper;
import cn.meteor.beyondclouds.modules.project.service.IProjectCommentService;
import cn.meteor.beyondclouds.modules.project.service.IProjectExtService;
import cn.meteor.beyondclouds.modules.project.service.IProjectService;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.user.entity.User;
......@@ -102,7 +102,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> impl
// 4.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.addMessage(SearchItemType.PROJECT, project.getProjectId())
DataItemChangeMessage.addMessage(SearchItemType.PROJECT, project.getProjectId())
);
}
......@@ -139,7 +139,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> impl
// 4.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.deleteMessage(SearchItemType.PROJECT, project.getProjectId())
DataItemChangeMessage.deleteMessage(SearchItemType.PROJECT, project.getProjectId())
);
}
......@@ -218,7 +218,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project> impl
// 4.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.updateMessage(SearchItemType.PROJECT, project.getProjectId())
DataItemChangeMessage.updateMessage(SearchItemType.PROJECT, project.getProjectId())
);
}
......
......@@ -9,7 +9,7 @@ import cn.meteor.beyondclouds.modules.question.exception.QuestionTagServiceExcep
import cn.meteor.beyondclouds.modules.question.mapper.QuestionMapper;
import cn.meteor.beyondclouds.modules.question.service.*;
import cn.meteor.beyondclouds.modules.question.util.QuestionUtils;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.tag.entity.Tag;
......@@ -148,7 +148,7 @@ public class QuestionServiceImpl extends ServiceImpl<QuestionMapper, Question> i
// 6.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.addMessage(SearchItemType.QUESTION, question.getQuestionId())
DataItemChangeMessage.addMessage(SearchItemType.QUESTION, question.getQuestionId())
);
}
......@@ -189,7 +189,7 @@ public class QuestionServiceImpl extends ServiceImpl<QuestionMapper, Question> i
// 9.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.deleteMessage(SearchItemType.QUESTION, question.getQuestionId())
DataItemChangeMessage.deleteMessage(SearchItemType.QUESTION, question.getQuestionId())
);
}
......@@ -238,7 +238,7 @@ public class QuestionServiceImpl extends ServiceImpl<QuestionMapper, Question> i
// 6.发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.updateMessage(SearchItemType.QUESTION, question.getQuestionId())
DataItemChangeMessage.updateMessage(SearchItemType.QUESTION, question.getQuestionId())
);
}
......
......@@ -14,24 +14,24 @@ import java.io.Serializable;
@Data
@ToString
@NoArgsConstructor
public class DataItemUpdateMessage {
public class DataItemChangeMessage {
public DataItemUpdateMessage(SearchItemUpdateType updateType, SearchItemType itemType, Serializable itemId) {
public DataItemChangeMessage(DataItemChangeType changeType, SearchItemType itemType, Serializable itemId) {
this.itemId = itemId;
this.itemType = itemType;
this.updateType = updateType;
this.changeType = changeType;
}
public static DataItemUpdateMessage addMessage(SearchItemType itemType, Serializable itemId) {
return new DataItemUpdateMessage(SearchItemUpdateType.ADD, itemType, itemId);
public static DataItemChangeMessage addMessage(SearchItemType itemType, Serializable itemId) {
return new DataItemChangeMessage(DataItemChangeType.ADD, itemType, itemId);
}
public static DataItemUpdateMessage deleteMessage(SearchItemType itemType, Serializable itemId) {
return new DataItemUpdateMessage(SearchItemUpdateType.DELETE, itemType, itemId);
public static DataItemChangeMessage deleteMessage(SearchItemType itemType, Serializable itemId) {
return new DataItemChangeMessage(DataItemChangeType.DELETE, itemType, itemId);
}
public static DataItemUpdateMessage updateMessage(SearchItemType itemType, Serializable itemId) {
return new DataItemUpdateMessage(SearchItemUpdateType.UPDATE, itemType, itemId);
public static DataItemChangeMessage updateMessage(SearchItemType itemType, Serializable itemId) {
return new DataItemChangeMessage(DataItemChangeType.UPDATE, itemType, itemId);
}
/**
......@@ -45,7 +45,7 @@ public class DataItemUpdateMessage {
private SearchItemType itemType;
/**
* 条目更新类型
* 条目改变类型
*/
private SearchItemUpdateType updateType;
private DataItemChangeType changeType;
}
package cn.meteor.beyondclouds.modules.queue.message;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
/**
* 搜索条目改变操作类型
* 数据更新操作
* @author meteor
*/
public enum SearchItemUpdateType {
public enum DataItemChangeType {
/**
* 新增条目
......@@ -21,5 +19,15 @@ public enum SearchItemUpdateType {
/**
* 更新条目
*/
UPDATE
UPDATE,
/**
* 用户头像更新
*/
USER_AVATAR_UPDATE,
/**
* 用户昵称更新
*/
USER_NICK_UPDATE;
}
package cn.meteor.beyondclouds.modules.queue.service;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
/**
* @author meteor
......@@ -12,5 +12,5 @@ public interface IMessageQueueService {
* 发送搜索条目更新消息
* @param itemUpdateMessage
*/
void sendItemUpdateMessage(DataItemUpdateMessage itemUpdateMessage);
void sendItemUpdateMessage(DataItemChangeMessage itemUpdateMessage);
}
package cn.meteor.beyondclouds.modules.queue.service.impl;
import cn.meteor.beyondclouds.config.properties.BeyondCloudsKafkaTopicProperties;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
......@@ -27,7 +27,7 @@ public class MessageQueueServiceImpl implements IMessageQueueService {
}
@Override
public void sendItemUpdateMessage(DataItemUpdateMessage itemUpdateMessage) {
public void sendItemUpdateMessage(DataItemChangeMessage itemUpdateMessage) {
try {
kafkaTemplate.send(topicProperties.getSearchItemUpdate(), JsonUtils.toJson(itemUpdateMessage));
log.debug("发送kafka消息:{}", itemUpdateMessage.toString());
......
package cn.meteor.beyondclouds.modules.search.listener;
import cn.meteor.beyondclouds.core.listener.DataItemChangeListener;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeType;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.search.service.ISearchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* @author meteor
*/
@Component
@Slf4j
public class SearchItemItemChangeListener extends DataItemChangeListener {
private ISearchService searchService;
@Autowired
public SearchItemItemChangeListener(ISearchService searchService) {
this.searchService = searchService;
}
@Override
public void onDataItemAdd(DataItemChangeMessage dataItemChangeMessage) {
searchService.saveSearchItem(dataItemChangeMessage.getItemType(), String.valueOf(dataItemChangeMessage.getItemId()));
}
@Override
public void onDataItemUpdate(DataItemChangeMessage dataItemChangeMessage) {
searchService.deleteSearchItem(dataItemChangeMessage.getItemType(), String.valueOf(dataItemChangeMessage.getItemId()));
}
@Override
public void onDataItemDelete(DataItemChangeMessage dataItemChangeMessage) {
searchService.updateSearchItem(dataItemChangeMessage.getItemType(), String.valueOf(dataItemChangeMessage.getItemId()));
}
}
package cn.meteor.beyondclouds.modules.search.listener;
import cn.meteor.beyondclouds.core.listener.TopicMessageListener;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.search.service.ISearchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* @author meteor
*/
@Component
@Slf4j
public class SearchItemUpdateListener extends TopicMessageListener {
private ISearchService searchService;
@Autowired
public SearchItemUpdateListener(ISearchService searchService) {
this.searchService = searchService;
}
@Override
public void onItemUpdate(DataItemUpdateMessage dataItemUpdateMessage) throws Exception {
// 处理搜索条目更新
SearchItemUpdateType updateType = dataItemUpdateMessage.getUpdateType();
Serializable itemId = dataItemUpdateMessage.getItemId();
SearchItemType searchItemType = dataItemUpdateMessage.getItemType();
// 根据不同的更新类型调用对应的方法
switch (updateType) {
case ADD:
searchService.saveSearchItem(searchItemType, String.valueOf(itemId) );
break;
case DELETE:
searchService.deleteSearchItem(searchItemType, String.valueOf(itemId));
break;
case UPDATE:
searchService.updateSearchItem(searchItemType, String.valueOf(itemId));
break;
default:
break;
}
}
}
......@@ -14,8 +14,6 @@ import cn.meteor.beyondclouds.modules.question.entity.Question;
import cn.meteor.beyondclouds.modules.question.exception.QuestionServiceException;
import cn.meteor.beyondclouds.modules.question.exception.QuestionTagServiceException;
import cn.meteor.beyondclouds.modules.question.service.IQuestionService;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType;
import cn.meteor.beyondclouds.modules.search.entity.SearchItem;
import cn.meteor.beyondclouds.modules.search.entity.SearchItemId;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
......@@ -34,7 +32,6 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Optional;
......
......@@ -14,7 +14,7 @@ import cn.meteor.beyondclouds.modules.project.entity.Project;
import cn.meteor.beyondclouds.modules.project.service.IProjectService;
import cn.meteor.beyondclouds.modules.question.entity.Question;
import cn.meteor.beyondclouds.modules.question.service.IQuestionService;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.tag.entity.Tag;
......@@ -241,7 +241,7 @@ public class UserServiceImpl extends ServiceImpl<IUserMapper, User> implements I
// 发送消息到消息队列
messageQueueService
.sendItemUpdateMessage(
DataItemUpdateMessage.updateMessage(SearchItemType.USER, user.getUserId())
DataItemChangeMessage.updateMessage(SearchItemType.USER, user.getUserId())
);
}
......
package cn.meteor.beyondclouds.modules.queue.service.impl;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeMessage;
import cn.meteor.beyondclouds.modules.queue.message.DataItemChangeType;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import org.junit.Test;
......@@ -24,10 +24,10 @@ public class MessageQueueServiceImplTest {
// message.setId("111L");
// message.setMsg("hello");
// messageQueueService.send(message);
DataItemUpdateMessage itemUpdateMessage = new DataItemUpdateMessage();
DataItemChangeMessage itemUpdateMessage = new DataItemChangeMessage();
itemUpdateMessage.setItemId("123");
itemUpdateMessage.setItemType(SearchItemType.PROJECT);
itemUpdateMessage.setUpdateType(SearchItemUpdateType.ADD);
itemUpdateMessage.setChangeType(DataItemChangeType.ADD);
messageQueueService.sendItemUpdateMessage(itemUpdateMessage);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment