0%

Jakarta中websocket在Undertow的使用

  • 使用jakarta中的ServerEndpoint来定义Websocket服务
  • 通过Undertow实现的ServerWebSocketContainer,它实现了Jakarta的ServerContainer接口,可以注册端点到此容器中
  • 通过spring-websocket中的ServerEndpointExploer来将自己定义的ServerEndpoint发现并注册到容器中。

MAVEN依赖

引入websocket和spring-websocket的相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
      <dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
<version>1.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

WebsocketBean

这个类用来定义Websocket服务端和对于连接的相关操作等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
* 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
*/
@Slf4j
@ServerEndpoint("/websocket/{userId}")
@Component
public class WebsocketBean {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;

private String userId;

private String ip;

//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
private static CopyOnWriteArraySet<WebsocketBean> webSocketSet = new CopyOnWriteArraySet<WebsocketBean>();

//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;


/**
* 连接建立成功调用的方法
*
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(@PathParam("userId") String userId, Session session) {
this.userId = userId;
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
//记录每个csci的所有连接 根据ip区分
String ipBySession = getIpBySession(session);
this.ip = ipBySession;
log.info("{}加入连接,当前在线用户:{}", userId, onlineCount);
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1

log.info("{}退出连接,当前在线用户:{}", userId, onlineCount);
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
log.info("【websocket:userId={}】接收到消息:{}", userId, message);
}

/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}

/**
* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
*
* @param message
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}

public static synchronized int getOnlineCount() {
return onlineCount;
}

public static synchronized void addOnlineCount() {
WebsocketBean.onlineCount++;
}

public static synchronized void subOnlineCount() {
WebsocketBean.onlineCount--;
}

public static void sendMessageAll(String message) throws IOException {
for (WebsocketBean item : webSocketSet) {
if (item.session.isOpen()) {
item.session.getBasicRemote().sendText(message);
}
}
}

public static Set<WebsocketBean> getAllClients() {
return webSocketSet;
}


public String getUserId() {
return this.userId;
}

public String getIp() {
return this.ip;
}

private String getIpBySession(Session session) {
UndertowSession undertowSession = (UndertowSession) session;
SocketAddress peerAddress = undertowSession.getWebSocketChannel().getPeerAddress();
String addressPort = peerAddress.toString();
String[] split = addressPort.replace("/", "").split(":");
if (split.length > 1) {
return split[0];
} else
return "";
}
}

WebsocketConfig

这个类用来创建ServerEndpointExporter,ServerEndpointExporter用来寻找spring容器中的所有ServerEndpoint来调用ServerWebsocketContainer中的注册方法将对应websocket服务注册到容器中。

1
2
3
4
5
6
7
@Configuration
public class WebsocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}

前端页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="js/jquery.min.js"></script>

</head>
<body>

<input type="text" id="message"/><button onclick="send();">发送</button>

</body>
<script type="text/javascript">

const ws = new WebSocket("ws://localhost:8080/ws/websocket/wch");
ws.onopen = function (evt) {
alert("连成功");
ws.send("wch html send")
}
ws.onmessage = function (evt) {
alert('Received Message: ' + evt.data);
ws.close();
};

ws.onclose = function (evt) {
alert('Connection closed.');
};

function send() {
ws.send($("#message").val());
}
</script>
</html>