supervisor和iOS的WebSocket

用supervisor作为守护进程

WebSocket在iOS上的实现

Supervisor

简介

SupervisorLinux/Unix 系统下的一个进程管理工具, 它是用 python2 开发的(没怎么找到它在python3下的用法). 它可以很方便的监听、 启动、 停止、 重启一个或多个进程. 在我的项目里主要是用它来开启我的 WebSocket 服务.

环境

CentOS 7
python 2.4+

安装

1
yum install supervisor

配置

如果使用 yum install supervisor 的命令安装,会生成默认配置 /etc/supervisord.conf 和目录 /etc/supervisord.d

在目录 /etc/supervisord.d 下创建两个新目录 conflog, conf 用来存放管理进程的配置, log 用于进程输出的日志.

然后修改一下 /etc/supervisord.conf[include] 部分, 用来加载我们自己的配置:

1
vi /etc/supervisord.conf

修改以下部分, 一般在文件较后面的地方:

1
2
[include]
files = supervisord.d/conf/*.conf

这样我们把进程配置文件放到 /etc/supervisord.d/conf/, Supervisor 也可以识别到了

编写进程配置文件

假如要创建一个新配置, 以我写好的 confd.conf 来看(; 为注释):

1
2
3
4
5
6
7
8
9
10
[program:confd]
directory = /root/IM_Server/imServer ; 命令目录
command = python3.7 websocket.py ; 启动命令,与shell中启动的命令是一样的
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 60 ; 启动60秒后没有异常退出, 就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile = /etc/supervisord.d/log/ws_run.log ;日志统一放在log目录下

要注意的坑

前面说到 Supervisor 是用 python2 开发的, 所以最好以 python2 来运行, 要确保在 shell 中直接运行 python -V 显示的版本是 python2

不过虽然是 python2 写的, 用来守护 python3 的程序是没有问题的

启动 Supervisor

1
supervisord -c /etc/supervisord.conf

没有出现错误提醒就表示运行成功了

检查和管理 Supervisor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 检查supervisord相关进程
ps aux | grep supervisord
# 停止某一个进程(programxxx), programxxx为[program:beepkg] 里配置的值, 这个示例就是 beepkg
supervisorctl stop programxxx
# 启动某个进程
supervisorctl start programxxx
# 重启某个进程
supervisorctl restart programxxx
# 查看进程状态
supervisorctl status
# 重启所有属于名为 groupworker 这个分组的进程(start, restart同理)
supervisorctl stop groupworker
# 停止全部进程, 注: start、 restart、 stop都不会载入最新的配置文件
supervisorctl stop all
# 载入最新的配置文件, 停止原有进程并按新的配置启动、 管理所有进程
supervisorctl reload
# 根据最新的配置文件, 启动新配置或有改动的进程, 配置没有改动的进程不会受影响而重启
supervisorctl update

参考博客

Supervisor的使用

iOS的WebSocket

weakSelf

用来解决循环引用的问题, 也以防在一个 block 中, 引用计数变为0造成对象销毁

1
#define WeakSelf(type)  __weak typeof(type) weak##type = type;

cocoapod

用这个来管理iOS第三方包, 包括下面要用到的 SocketRocket

安装

1
2
sudo gem install cocoapods
pod setup

配置文件

在你的项目根目录下, 新建一个文件名为 Podfile, 输入以下内容:

1
2
3
4
5
6
platform :ios, '8.0'
use_frameworks!

target 'AppName' do
pod 'SocketRocket'
end

然后执行

1
pod install

就能安装好 SocketRocket 了, 同时记得打开项目时打开 .xcworkspace 文件, 而不是原来那个

如果以后更新了 Podfile 文件, 则运行

1
pod update

SocketRocket

它是Facebook开发的OC语言里WebSocket的框架

大概说一下基本用法, 然后放上简单测试过的代码

创建websocket连接

1
2
3
4
5
6
7
#import <SocketRocket.h>
// 当然要引入头文件

self.socket = [[SRWebSocket alloc] initWithURLRequest:
[NSURLRequest requestWithURL:[NSURL URLWithString:[NSString stringWithFormat:@"ws://ip:port"]]]];
// 委托对象为本身
self.socket.delegate = self;

开启websocket连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 开启连接
[self.socket open];

// 成功连接的操作
- (void)webSocketDidOpen:(SRWebSocket *)webSocket {
//这里开启心跳信号
[self initHeartBeat];
if (webSocket == self.socket) {
NSLog(@"************************** socket 连接成功************************** ");
[self wsOperate:@"connect" data:@"user"];
}
}

// 连接失败的操作
- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error {
if (webSocket == self.socket) {
NSLog(@"************************** socket 连接失败************************** ");
_socket = nil;
//连接失败可以重连
[self reConnect];
}
}

关闭连接

1
2
3
4
5
6
7
8
9
10
// 关闭连接
-(void)SRWebSocketClose{
if (self.socket){
[self.socket close];
self.socket = nil;

//断开连接时销毁心跳信号
[self destoryHeartBeat];
}
}

发送消息

这里的数据格式是写成我的项目需要的 JSON 数据, 可以根据需要更改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
- (void) wsOperate: (NSString *) action data:(NSString *)data {
NSDictionary *configDic = @{
@"action" : action,
@"data" : data
};
NSError *error;
NSString *sdata;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:configDic
options:0
error:&error];

if (!jsonData) {
NSLog(@" error: %@", error.localizedDescription);
return;
} else {
sdata = [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
}

WeakSelf(self);

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
if (weakself.socket != nil) {

// 只有 SR_OPEN 开启状态才能调 send 方法啊, 不然要崩
if (weakself.socket.readyState == SR_OPEN) {
NSLog(@"in:::::");
[weakself.socket send:sdata]; // 发送数据

} else if (weakself.socket.readyState == SR_CONNECTING) {
[self reConnect];

} else if (weakself.socket.readyState == SR_CLOSING || weakself.socket.readyState == SR_CLOSED) {
// websocket 断开了, 调用 reConnect 方法重连
[self reConnect];
}
} else {
//如果在发送数据,但是socket已经关闭,可以在再次打开
[self SRWebSocketOpen];
}
});
}

处理接受到的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#pragma mark - socket delegate
- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message {

if (webSocket == self.socket) {

NSLog(@"message:%@",message);
if(!message){
return;
}

// To Do:

}
}

使用方法

目前是把整个封装成一个类, 使用单例模式, 这样其他地方要用的时候就获取实例来使用

简单测试

黄框是主动发送给服务器收到的回复;
橙框是收到服务器主动发送的数据;

其他

关于心跳和重连可以看完整代码, 这些都是根据自己需要实现的, 跟 websocket 没有很强烈的关系

还有就是在这个框架的使用, 没有看到类似其他python websocket库中有 onerror 的委托, 不知道这里是如何处理的…

完整代码

以下代码只是第一版, 之后会根据需要修改代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// .h
//
// SocketRocketUtility.h
// multiThread
//
// Created by Pachirisu on 2019/5/5.
// Copyright © 2019 Pachirisu. All rights reserved.
//

#import <Foundation/Foundation.h>
#import <SocketRocket.h>

NS_ASSUME_NONNULL_BEGIN

@interface SocketRocketUtility : NSObject

/** 连接状态 */
@property (nonatomic,assign) SRReadyState socketReadyState;
// @property (nonatomic, copy) void (^didReceiveMessage)(id message);
+ (SocketRocketUtility *)instance;

- (void)SRWebSocketOpen; //开启连接
- (void)SRWebSocketClose; //关闭连接
- (void)wsOperate:(NSString *) action data:(NSString *)data;
- (void)registerNetworkNotifications; //监测网络状态

@end

NS_ASSUME_NONNULL_END
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//
// SocketRocketUtility.m
// multiThread
//
// Created by Pachirisu on 2019/5/5.
// Copyright © 2019 Pachirisu. All rights reserved.
//

#import "SocketRocketUtility.h"
//#import <AFNetworkReachabilityManager.h>

// 弱引用
#define WeakSelf(type) __weak typeof(type) weak##type = type;

// 确保运行在主线程的宏
#define dispatch_main_async_safe(block)\
if ([NSThread isMainThread]) {\
block();\
} else {\
dispatch_async(dispatch_get_main_queue(), block);\
}

@interface SocketRocketUtility()<SRWebSocketDelegate>
{
NSTimer * heartBeat;
NSTimeInterval reConnectTime;
//SocketDataType type;
NSString *host;
}

@property (nonatomic,strong) SRWebSocket *socket;

@end

@implementation SocketRocketUtility

+ (SocketRocketUtility *)instance{
static SocketRocketUtility *Instance = nil;
static dispatch_once_t predicate;
dispatch_once(&predicate, ^{
Instance = [[SocketRocketUtility alloc] init];
});
return Instance;
}

// 开启连接
- (void) SRWebSocketOpen{
//如果是同一个url return
if (self.socket) {
return;
}
if(self.socket.readyState == SR_OPEN){
return;
}

self.socket = [[SRWebSocket alloc] initWithURLRequest:
[NSURLRequest requestWithURL:[NSURL URLWithString:[NSString stringWithFormat:@"ws://172.18.32.97:6789"]]]];//这里填写你服务器的地址

NSLog(@"请求的websocket地址:%@",self.socket.url.absoluteString);
self.socket.delegate = self; //实现这个 SRWebSocketDelegate 协议
[self.socket open]; //open 就是直接连接了

}

// 成功连接的操作
- (void)webSocketDidOpen:(SRWebSocket *)webSocket {
//开启心跳
[self initHeartBeat];
if (webSocket == self.socket) {
NSLog(@"************************** socket 连接成功************************** ");
[self wsOperate:@"connect" data:@"user"];
}
}

// 连接失败的操作
- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error {

if (webSocket == self.socket) {
NSLog(@"************************** socket 连接失败************************** ");
_socket = nil;
//连接失败就重连
[self reConnect];
}
}

// ws操作
- (void) wsOperate: (NSString *) action data:(NSString *)data {
NSDictionary *configDic = @{
@"action" : action,
@"data" : data
};
NSError *error;
NSString *sdata;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:configDic
options:0
error:&error];

if (!jsonData) {
NSLog(@" error: %@", error.localizedDescription);
return;
} else {
sdata = [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
}

WeakSelf(self);

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
if (weakself.socket != nil) {

// 只有 SR_OPEN 开启状态才能调 send 方法啊, 不然要崩
if (weakself.socket.readyState == SR_OPEN) {
NSLog(@"in:::::");
[weakself.socket send:sdata]; // 发送数据

} else if (weakself.socket.readyState == SR_CONNECTING) {
[self reConnect];

} else if (weakself.socket.readyState == SR_CLOSING || weakself.socket.readyState == SR_CLOSED) {
// websocket 断开了, 调用 reConnect 方法重连
[self reConnect];
}
} else {
//如果在发送数据,但是socket已经关闭,可以在再次打开
[self SRWebSocketOpen];
}
});
}

// disconnect操作

// 接受消息
#pragma mark - socket delegate
- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message {

if (webSocket == self.socket) {

NSLog(@"message:%@",message);
if(!message){
return;
}

// To Do:

}
}

// 关闭连接
-(void)SRWebSocketClose{
if (self.socket){
[self.socket close];
self.socket = nil;

//断开连接时销毁心跳
[self destoryHeartBeat];
}
}

// 心跳
- (void) ping{
if(self.socket.readyState == SR_OPEN) {
[self.socket sendPing:nil];
}
}

- (void) initHeartBeat{
dispatch_main_async_safe(^{
[self destoryHeartBeat];
heartBeat = [NSTimer timerWithTimeInterval:10 target:self selector:@selector(ping) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop]addTimer:heartBeat forMode:NSRunLoopCommonModes];
})
}

//取消心跳
- (void)destoryHeartBeat
{
dispatch_main_async_safe(^{
if (heartBeat) {
if ([heartBeat respondsToSelector:@selector(isValid)]){
if ([heartBeat isValid]){
[heartBeat invalidate];
heartBeat = nil;
}
}
}
})
}

// 重连
- (void)reConnect
{

[self SRWebSocketClose];
//超过一分钟就不再重连 所以只会重连5次 2^5 = 64
if (reConnectTime > 64*2) {
//您的网络状况不是很好, 请检查网络后重试
return;
}


dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(reConnectTime * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
self.socket = nil;
[self SRWebSocketOpen];
NSLog(@"重连");
});

//重连时间2的指数级增长
if (reConnectTime == 0) {
reConnectTime = 2;
}else{
reConnectTime *= 2;
}

}

- (SRReadyState)socketReadyState{
return self.socket.readyState;
}

- (void)dealloc{
[[NSNotificationCenter defaultCenter] removeObserver:self];
NSLog(@"SocketRocketUtility dealloced");
}

//- (void)registerNetworkNotifications{
// [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(networkChangedNote:) name:AFNetworkingReachabilityDidChangeNotification object:nil];
//}


@end