Commit 63b24893 by 段启岩

加入socket.io

parent 730b6501
......@@ -141,11 +141,14 @@
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.corundumstudio.socketio/netty-socketio -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.18</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
......
package cn.meteor.beyondclouds.config;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author meteor
*/
@Configuration
public class SocketIOConfig {
@Bean
public SocketIOServer socketIOServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setSocketConfig(socketConfig);
config.setHostname("0.0.0.0");
config.setPort(8002);
config.setBossThreads(1);
config.setAllowCustomRequests(true);
config.setPingInterval(25000);
return new SocketIOServer(config);
}
}
......@@ -2,15 +2,15 @@ package cn.meteor.beyondclouds.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
//import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author meteor
*/
@Configuration
//@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
// @Bean
// public ServerEndpointExporter serverEndpointExporter() {
// return new ServerEndpointExporter();
// }
}
......@@ -20,8 +20,8 @@ import java.util.regex.Pattern;
/**
* @author meteor
*/
@ServerEndpoint("/im")
@Component
//@ServerEndpoint("/im")
//@Component
@Slf4j
public class ImServer {
......
package cn.meteor.beyondclouds.modules.im.server;
public interface SocketIOService {
//推送的事件
public static final String PUSH_EVENT = "push_event";
// 启动服务
void start() throws Exception;
// 停止服务
void stop();
// 推送信息
void pushMessageToUser(String userId, String message);
}
package cn.meteor.beyondclouds.modules.im.server;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SocketIOServiceImpl implements SocketIOService {
// 用来存已连接的客户端
private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>();
@Autowired
private SocketIOServer socketIOServer;
/**
* Spring IoC容器创建之后,在加载SocketIOServiceImpl Bean之后启动
*
* @throws Exception
*/
@PostConstruct
private void autoStartup() throws Exception {
start();
}
/**
* Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
*
* @throws Exception
*/
@PreDestroy
private void autoStop() throws Exception {
stop();
}
@Override
public void start() throws Exception {
// 监听客户端连接
socketIOServer.addConnectListener(client -> {
client.sendEvent(PUSH_EVENT, "hello, 这是服务器发来的消息!");
});
// 监听客户端断开连接
socketIOServer.addDisconnectListener(client -> {
String loginUserNum = getParamsByClient(client);
if (loginUserNum != null) {
clientMap.remove(loginUserNum);
System.out.println("断开连接: " + loginUserNum);
System.out.println("断开连接: " + client.getSessionId());
client.disconnect();
}
});
// 处理自定义的事件,与连接监听类似
socketIOServer.addEventListener("text", Object.class, (client, data, ackSender) -> {
// TODO do something
client.getHandshakeData();
System.out.println( " 客户端:************ " + data);
});
socketIOServer.start();
System.out.println("START.");
}
@Override
public void stop() {
if (socketIOServer != null) {
socketIOServer.stop();
socketIOServer = null;
}
}
@Override
public void pushMessageToUser(String userId, String message) {
if (StringUtils.isNotBlank(userId)) {
SocketIOClient client = clientMap.get(userId);
if (client != null) {
client.sendEvent(PUSH_EVENT, message);
}
}
}
/**
* 此方法为获取client连接中的参数,可根据需求更改
* @param client
* @return
*/
private String getParamsByClient(SocketIOClient client) {
// 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
List<String> list = params.get("loginUserNum");
if (list != null && list.size() > 0) {
return list.get(0);
}
return null;
}
public static Map<String, SocketIOClient> getClientMap() {
return clientMap;
}
public static void setClientMap(Map<String, SocketIOClient> clientMap) {
SocketIOServiceImpl.clientMap = clientMap;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment