Commit 0c7a73f4 by 段启岩

添加kafka

parent 33983d82
......@@ -137,6 +137,11 @@
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
......
......@@ -2,10 +2,12 @@ package cn.meteor.beyondclouds;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
/**
* @author meteor
*/
@EnableElasticsearchRepositories(basePackages = "cn.meteor.beyondclouds.modules.search")
@SpringBootApplication
public class BeyondCloudsApplication {
......
package cn.meteor.beyondclouds.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author meteor
*/
@Configuration
public class KafkaInitialConfig {
/**
* 创建topic
* @return
*/
@Bean
public NewTopic initialTopic() {
return new NewTopic("beyond",8, (short) 1 );
}
}
package cn.meteor.beyondclouds.modules.queue.service;
import cn.meteor.beyondclouds.modules.queue.dto.Message;
/**
* @author meteor
*/
public interface IMessageQueueService {
/**
* 发送消息
*
*/
void sendMessage();
}
package cn.meteor.beyondclouds.modules.queue.service.impl;
import cn.meteor.beyondclouds.modules.queue.dto.Message;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import cn.meteor.beyondclouds.util.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author meteor
*/
@Service
public class MessageQueueServiceImpl implements IMessageQueueService {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public MessageQueueServiceImpl(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void send() {
Message<?> message = new Message();
kafkaTemplate.send()
kafkaTemplate.send("beyond", JsonUtils.toJson(message));
}
}
package cn.meteor.beyondclouds.modules.search.entity;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import lombok.Data;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.util.Date;
/**
* @author meteor
*/
@ToString
@Data
@Document(indexName = "search_item")
public class SearchItem {
@Id
String id;
/**
* 数据的主键
*/
@Field(index = false)
private String itemId;
/**
* 数据类型
*/
@Field(index = false)
private SearchItemType type;
/**
* 数据标题
*/
@Field(analyzer = "ik_max_word", type = FieldType.Text)
private String title;
/**
* 数据封面图
*/
@Field(index = false, type = FieldType.Text)
private String cover;
/**
* 数据摘要
*/
@Field(analyzer = "ik_max_word", type = FieldType.Text)
private String description;
/**
* 数据内容
*/
@Field(analyzer = "ik_max_word", type = FieldType.Text)
private String content;
/**
* 数据创建时间
*/
@Field(index = false)
private Date createTime;
/**
* 数据更新时间
*/
@Field(index = false)
private Date updateTime;
}
package cn.meteor.beyondclouds.modules.search.enums;
/**
* @author meteor
*/
public enum SearchItemType {
USER,
BLOG,
TAG,
PROJECT,
QUESTION
}
package cn.meteor.beyondclouds.modules.search.message;
import cn.meteor.beyondclouds.modules.search.entity.SearchItem;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public class SearchItemMessage implements Message<SearchItem> {
@Override
public SearchItem getPayload() {
return null;
}
@Override
public MessageHeaders getHeaders() {
return null;
}
}
package cn.meteor.beyondclouds.modules.search.repository;
import cn.meteor.beyondclouds.modules.search.entity.SearchItem;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
/**
* @author meteor
*/
@Repository
public interface ISearchRepository extends ElasticsearchRepository<SearchItem, String> {
}
package cn.meteor.beyondclouds.modules.search.service;
/**
* @author meteor
*/
public interface ISearchService {
}
package cn.meteor.beyondclouds.modules.search.service.impl;
import cn.meteor.beyondclouds.modules.search.service.ISearchService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Optional;
/**
* @author meteor
*/
@Service
@Slf4j
public class SearchServiceImpl implements ISearchService {
@KafkaListener(topics = "beyond")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
spring:
# 数据源
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/beyond_clouds?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 197442
# 邮箱
mail:
host: smtp.163.com
username: 13546386889@163.com
......@@ -21,11 +25,32 @@ spring:
socketFactory:
fallback: false
# elasticsearch
data:
elasticsearch:
repositories:
enabled: true
elasticsearch:
rest:
uris: http://localhost:9200
# kafka
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: beyond-clouds
enable-auto-commit: true
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# swagger
swagger:
enable: true
# 阿里云
aliyun:
access-key-id: LTAIeHWeydaWT3ZZ
access-key-secret: 2FTWlODpzEZjsBQw10NO6SUBMwYOcL
......@@ -40,6 +65,8 @@ aliyun:
region-id: cn-hangzhou
sign-name: 段启岩
template-code-map: {verifyCode: SMS_142384912}
# mybatis-plus
mybatis-plus:
mapper-locations: classpath*:cn/meteor/beyondclouds/modules/**/xml/*.xml
logging:
......@@ -47,6 +74,8 @@ logging:
cn:
meteor:
beyondclouds: debug
# 云里云外
beyondclouds:
debug: true
auth:
......
package cn.meteor.beyondclouds.modules.queue.service.impl;
import cn.meteor.beyondclouds.modules.queue.dto.Message;
import cn.meteor.beyondclouds.modules.queue.service.IMessageQueueService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*;
@SpringBootTest
@RunWith(SpringRunner.class)
public class MessageQueueServiceImplTest {
@Autowired
private IMessageQueueService messageQueueService;
@Test
public void send() {
System.out.println(messageQueueService);
Message message = new Message();
message.setId("111L");
message.setMsg("hello");
messageQueueService.send(message);
}
}
\ No newline at end of file
package cn.meteor.beyondclouds.modules.search.repository;
import cn.meteor.beyondclouds.modules.search.entity.SearchItem;
import cn.meteor.beyondclouds.modules.search.enums.SearchItemType;
import org.elasticsearch.index.query.DisMaxQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Date;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ISearchRepositoryTest {
@Autowired
private ISearchRepository searchRepository;
@Test
public void test() {
System.out.println(searchRepository);
SearchItem searchItem = new SearchItem();
searchItem.setContent("我爱中国");
searchItem.setCover("null");
searchItem.setDescription("测试");
searchItem.setTitle("ok");
searchItem.setCreateTime(new Date());
searchItem.setItemId("aad");
searchItem.setType(SearchItemType.BLOG);
searchRepository.save(searchItem);
}
@Test
public void testQuery() {
QueryBuilder queryBuilder = QueryBuilders.matchQuery("content", "我们中国啊");
Iterable<SearchItem> searchItems = searchRepository.search(queryBuilder);
for (SearchItem searchItem : searchItems) {
System.out.println(searchItem);
}
}
/**
* 中文、拼音混合搜索
*
* @param content the content
* @return dis max query builder
*/
public DisMaxQueryBuilder structureQuery(String content) {
//使用dis_max直接取多个query中,分数最高的那一个query的分数即可
DisMaxQueryBuilder disMaxQueryBuilder = QueryBuilders.disMaxQuery();
//boost 设置权重,只搜索匹配name和disrector字段
QueryBuilder ikNameQuery = QueryBuilders.matchQuery("name", content).boost(2f);
QueryBuilder pinyinNameQuery = QueryBuilders.matchQuery("name.pinyin", content);
QueryBuilder ikDirectorQuery = QueryBuilders.matchQuery("director", content).boost(2f);
disMaxQueryBuilder.add(ikNameQuery);
disMaxQueryBuilder.add(pinyinNameQuery);
disMaxQueryBuilder.add(ikDirectorQuery);
return disMaxQueryBuilder;
}
}
\ No newline at end of file
......@@ -13,7 +13,7 @@ import static org.junit.Assert.*;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TagServiceImplTest {
public class TagServiceQueueServiceImplTest {
@Autowired
private ITagService tagService;
......
......@@ -13,7 +13,7 @@ import static org.junit.Assert.*;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TopicServiceImplTest {
public class TopicServiceQueueServiceImplTest {
@Autowired
private ITopicService topicService;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment