Commit cb508aec by 段启岩

架构关系-建立父类-TopicMessageListener

parent 6d7a97a6
package cn.meteor.beyondclouds.core.listener;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.io.Serializable;
import java.util.Optional;
/**
* @author meteor
*/
@Slf4j
public class TopicMessageListener {
@KafkaListener(topics = "${beyondclouds.kafka.topics.search-item-update}")
public final void itemUpdate(ConsumerRecord<?, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
DataItemUpdateMessage itemUpdateMessage;
try {
itemUpdateMessage = JsonUtils.toBean(kafkaMessage.get(), DataItemUpdateMessage.class);
log.debug("接收到kafka消息:{}", itemUpdateMessage.toString());
// 调用消息处理函数
onItemUpdate(itemUpdateMessage);
} catch (Exception e) {
e.printStackTrace();
log.error("DataItemUpdateMessage consume failed:{}", e.getMessage());
}
}
}
/**
* 数据更新事件
* @param dataItemUpdateMessage
*/
public void onItemUpdate(DataItemUpdateMessage dataItemUpdateMessage) throws Exception {
}
}
package cn.meteor.beyondclouds.modules.search.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* @author meteor
*/
public interface ISearchItemUpdateListener {
/**
* 搜索条目更新
* @param record
*/
void onSearchItemUpdate(ConsumerRecord<?, String> record);
}
package cn.meteor.beyondclouds.modules.search.listener.impl; package cn.meteor.beyondclouds.modules.search.listener;
import cn.meteor.beyondclouds.core.listener.TopicMessageListener;
import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage; import cn.meteor.beyondclouds.modules.queue.message.DataItemUpdateMessage;
import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType; import cn.meteor.beyondclouds.modules.queue.message.SearchItemUpdateType;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType; import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import cn.meteor.beyondclouds.modules.search.listener.ISearchItemUpdateListener;
import cn.meteor.beyondclouds.modules.search.service.ISearchService; import cn.meteor.beyondclouds.modules.search.service.ISearchService;
import cn.meteor.beyondclouds.util.JsonUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional;
/** /**
* @author meteor * @author meteor
*/ */
@Component @Component
@Slf4j @Slf4j
public class SearchItemUpdateListener implements ISearchItemUpdateListener { public class SearchItemUpdateListener extends TopicMessageListener {
private ISearchService searchService; private ISearchService searchService;
...@@ -30,40 +26,26 @@ public class SearchItemUpdateListener implements ISearchItemUpdateListener { ...@@ -30,40 +26,26 @@ public class SearchItemUpdateListener implements ISearchItemUpdateListener {
} }
@Override @Override
@KafkaListener(topics = "${beyondclouds.kafka.topics.search-item-update}") public void onItemUpdate(DataItemUpdateMessage dataItemUpdateMessage) throws Exception {
public void onSearchItemUpdate(ConsumerRecord<?, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value()); // 处理搜索条目更新
if (kafkaMessage.isPresent()) { SearchItemUpdateType updateType = dataItemUpdateMessage.getUpdateType();
DataItemUpdateMessage itemUpdateMessage; Serializable itemId = dataItemUpdateMessage.getItemId();
SearchItemType searchItemType = dataItemUpdateMessage.getItemType();
try {
itemUpdateMessage = JsonUtils.toBean(kafkaMessage.get(), DataItemUpdateMessage.class); // 根据不同的更新类型调用对应的方法
log.debug("接收到kafka消息:{}", itemUpdateMessage.toString()); switch (updateType) {
case ADD:
// 处理搜索条目更新 searchService.saveSearchItem(searchItemType, String.valueOf(itemId) );
SearchItemUpdateType updateType = itemUpdateMessage.getUpdateType(); break;
Serializable itemId = itemUpdateMessage.getItemId(); case DELETE:
SearchItemType searchItemType = itemUpdateMessage.getItemType(); searchService.deleteSearchItem(searchItemType, String.valueOf(itemId));
break;
// 根据不同的更新类型调用对应的方法 case UPDATE:
switch (updateType) { searchService.updateSearchItem(searchItemType, String.valueOf(itemId));
case ADD: break;
searchService.saveSearchItem(searchItemType, String.valueOf(itemId) ); default:
break; break;
case DELETE:
searchService.deleteSearchItem(searchItemType, String.valueOf(itemId));
break;
case UPDATE:
searchService.updateSearchItem(searchItemType, String.valueOf(itemId));
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
log.error("searchItemUpdateMessage covert failed:{}", e.getMessage());
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment