Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
M
my-yaf-project
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chenchuanwen
my-yaf-project
Commits
83ed1083
Commit
83ed1083
authored
Jan 21, 2019
by
chenchuanwen
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
push
parent
e9f96187
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
830 additions
and
141 deletions
+830
-141
bridge.php
scripts/crontab/push/bridge.php
+1
-1
bridgeBase.php
scripts/crontab/push/bridgeBase.php
+61
-0
bridgeStart.php
scripts/crontab/push/bridgeStart.php
+54
-139
bridgeStart_bak.php
scripts/crontab/push/bridgeStart_bak.php
+151
-0
newsServer.php
scripts/crontab/push/newsServer.php
+324
-0
newsServerBaseMessage.php
scripts/crontab/push/newsServerBaseMessage.php
+7
-0
runRedisBase.php
scripts/crontab/push/runRedisBase.php
+47
-0
runRedisCanReload.php
scripts/crontab/push/runRedisCanReload.php
+183
-0
runRedisPush.php
scripts/crontab/push/runRedisPush.php
+2
-1
No files found.
scripts/crontab/push/bridge.php
View file @
83ed1083
...
@@ -41,7 +41,7 @@ $http->on('request', function ($request, $response) {
...
@@ -41,7 +41,7 @@ $http->on('request', function ($request, $response) {
throw
new
Exception
(
$ex
->
getMessage
(),
$ex
->
getCode
());
throw
new
Exception
(
$ex
->
getMessage
(),
$ex
->
getCode
());
}
}
});
});
$http
->
start
();
$http
->
start
();
scripts/crontab/push/bridgeBase.php
0 → 100644
View file @
83ed1083
<?php
if
(
!
empty
(
$request
->
post
)){
try
{
$respData
=
$request
->
post
;
// echo json_encode($respData);
if
(
isset
(
$respData
[
'type'
])
&&
$respData
[
'type'
]
==
1
){
$info
=
$respData
[
'content'
];
$memberDao
=
$info
[
'className'
]
::
getInstance
(
\Our\DbNameConst
::
masterDBConnectName
);
echo
json_encode
(
$info
);
if
(
empty
(
$info
[
'params'
])){
$res
=
call_user_func_array
(
array
(
$memberDao
,
$info
[
'method'
]),
array
());
}
else
{
$res
=
call_user_func_array
(
array
(
$memberDao
,
$info
[
'method'
]),
$info
[
'params'
]);
}
$res
=
(
isset
(
$res
)
&&!
empty
(
$res
))
?
$res
:
false
;
unset
(
$memberDao
);
\Mysql\LinkMySQLModel
::
unsetDbConecet
();
if
(
$res
!==
false
){
echo
'success'
.
PHP_EOL
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
1
,
'message'
=>
'执行成功'
,
'data'
=>
$res
)));
}
else
{
echo
'fail1'
.
PHP_EOL
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
else
{
echo
'fail2'
.
PHP_EOL
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
catch
(
Exception
$ex
){
throw
new
Exception
(
$ex
->
getMessage
(),
$ex
->
getCode
());
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
else
{
// 通过链接参数热重载 worker 进程观察触发事件
if
(
!
empty
(
$request
->
get
)){
$act
=
$request
->
get
[
'act'
]
?
$request
->
get
[
'act'
]
:
''
;
if
(
$act
==
'reload'
)
{
echo
' ... Swoole Reloading ! ... '
.
PHP_EOL
.
PHP_EOL
;
// 触发 reload 之后, 貌似后面的代码也还是会执行的
$http
->
reload
();
echo
' ... Under Reload ! ... '
.
PHP_EOL
.
PHP_EOL
;
// 看看 reload 时是否会执行后续的代码
$response
->
end
(
json_encode
(
array
(
'status'
=>
1
,
'message'
=>
'重启成功'
)));
}
elseif
(
$act
==
'stop'
)
{
// 直接立即终止当前 worker 进程, 和 reload 的效果比较相似, 新的 worker 进程的 ID 和原来的一样
// 所以程序内部应该尽量避免使用 exit 而应该抛出异常在外部 catch
echo
' ... Swoole Exit ! ... '
.
PHP_EOL
.
PHP_EOL
;
exit
;
}
elseif
(
$act
==
'shutdown'
)
{
// 直接立即终止当前 worker 进程, 和 reload 的效果比较相似, 新的 worker 进程的 ID 和原来的
// 所以程序内部应该尽量避免使用 exit 而应该抛出异常在外部 catch
echo
' ... Swoole Shutdown ! ... '
.
PHP_EOL
.
PHP_EOL
;
$http
->
shutdown
();
echo
' ... After Swoole Shutdown ! ... '
.
PHP_EOL
.
PHP_EOL
;
}
}
else
{
$response
->
end
(
json_encode
(
array
(
'status'
=>
1
,
'message'
=>
'请求成功'
)));
}
}
scripts/crontab/push/bridgeStart.php
View file @
83ed1083
<?php
<?php
/**
*
* 脚本
*
* @author ccw <31435391@qq.com>
*/
define
(
"APPLICATION_PATH"
,
realpath
(
dirname
(
__FILE__
)
.
'/../../../'
));
//指向public的上一级
define
(
"APPLICATION_PATH"
,
realpath
(
dirname
(
__FILE__
)
.
'/../../../'
));
//指向public的上一级
require
APPLICATION_PATH
.
'/scripts/crontab/baseCli.php'
;
require
APPLICATION_PATH
.
'/scripts/crontab/common.php'
;
error_reporting
(
E_ALL
^
E_NOTICE
);
class
bridgeStart
{
/* config */
const
LISTEN
=
"tcp://192.168.2.15:5555"
;
const
MAXCONN
=
100
;
const
pidfile
=
__CLASS__
;
const
uid
=
81
;
const
gid
=
81
;
/**/
protected
$pool
=
NULL
;
protected
$zmq
=
NULL
;
public
function
__construct
()
$http
=
new
swoole_http_server
(
"0.0.0.0"
,
9501
);
{
$this
->
pidfile
=
'/var/run/'
.
self
::
pidfile
.
'.pid'
;
}
private
function
daemon
()
/**
{
* 测试在 $server 外部注册全局自定义属性, 看看会不会被覆盖
if
(
file_exists
(
$this
->
pidfile
))
{
*/
echo
"The file
$this->pidfile
exists.
\n
"
;
exit
();
}
$pid
=
pcntl_fork
();
/**
if
(
$pid
==
-
1
)
{
* 测试在 $server 外部注册全局自定义属性, 看看会不会被覆盖
die
(
'could not fork'
);
*/
}
else
if
(
$pid
)
{
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit
(
$pid
);
}
else
{
// we are the child
file_put_contents
(
$this
->
pidfile
,
getmypid
());
posix_setuid
(
self
::
uid
);
posix_setgid
(
self
::
gid
);
return
(
getmypid
());
}
}
public
function
callback
(
$instance
,
$channelName
,
$message
){
$orderService
=
\Business\Order\OrderServiceModel
::
getInstance
();
$orderService
->
testPush
(
$message
);
}
protected
function
status
(){
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
printf
(
"%s already running, pid = %s
\n
"
,
$this
->
argv
[
0
],
$pid
);
}
else
{
printf
(
"%s haven't running
\n
"
,
$this
->
argv
[
0
]);
}
}
private
function
start
()
{
$pid
=
$this
->
daemon
();
$conf
=
\Yaf\Registry
::
get
(
'config'
)
->
get
(
'redis.database.params'
);
$http
=
new
swoole_http_server
(
"0.0.0.0"
,
9501
);
$http
->
on
(
'request'
,
function
(
$request
,
$response
)
{
try
{
$respData
=
$request
->
post
;
// echo json_encode($respData);
if
(
isset
(
$respData
[
'type'
])
&&
$respData
[
'type'
]
==
1
){
$http
->
myWorkerVar
=
'global'
;
$info
=
$respData
[
'content'
];
$memberDao
=
$info
[
'className'
]
::
getInstance
(
\Our\DbNameConst
::
masterDBConnectName
);
echo
json_encode
(
$info
);
if
(
empty
(
$info
[
'params'
])){
$res
=
call_user_func_array
(
array
(
$memberDao
,
$info
[
'method'
]),
array
());
}
else
{
$res
=
call_user_func_array
(
array
(
$memberDao
,
$info
[
'method'
]),
$info
[
'params'
]);
}
$res
=
(
isset
(
$res
)
&&!
empty
(
$res
))
?
$res
:
false
;
unset
(
$memberDao
);
\Mysql\LinkMySQLModel
::
unsetDbConecet
();
if
(
$res
!==
false
){
echo
'success'
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
1
,
'message'
=>
'执行成功'
,
'data'
=>
$res
)));
}
else
{
echo
'fail1'
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
else
{
echo
'fail2'
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
catch
(
Exception
$ex
){
throw
new
Exception
(
$ex
->
getMessage
(),
$ex
->
getCode
());
}
});
$http
->
start
();
}
$http
->
set
(
array
(
private
function
reload
(){
'worker_num'
=>
5
,
if
(
file_exists
(
$this
->
pidfile
))
{
'daemonize'
=>
false
,
$pid
=
file_get_contents
(
$this
->
pidfile
);
));
//posix_kill(posix_getpid(), SIGHUP);
posix_kill
(
$pid
,
SIGHUP
);
}
}
protected
function
restart
(){
$this
->
stop
();
sleep
(
1
);
$this
->
start
();
}
private
function
stop
()
// 服务器启动时执行一次
{
$http
->
on
(
'Start'
,
function
(
swoole_http_server
$server
)
{
echo
''
;
});
if
(
file_exists
(
$this
->
pidfile
))
{
// 服务器启动时执行一次
$pid
=
file_get_contents
(
$this
->
pidfile
);
$http
->
on
(
'ManagerStart'
,
function
(
swoole_http_server
$server
)
{
posix_kill
(
$pid
,
9
);
echo
'ManagerStart: '
.
PHP_EOL
.
PHP_EOL
;
unlink
(
$this
->
pidfile
);
});
}
}
private
function
help
(
$proc
)
// 每个 Worker 进程启动或重启时都会执行
{
$http
->
on
(
'WorkerStart'
,
function
(
swoole_http_server
$server
,
$workerId
)
{
printf
(
"%s start | stop | help | restart | reload
\n
"
,
$proc
);
// 通过重新加载外部文件来重载代码和释放之前占用的内存
}
//include_once __DIR__ . DIRECTORY_SEPARATOR .'workerstart.php';
private
$argv
;
// 下面这些直接写在当前文件中的代码即便重载也不会变化
public
function
main
(
$argv
)
{
echo
'WorkerStart: '
.
PHP_EOL
.
PHP_EOL
;
$this
->
argv
=
$argv
;
echo
' Worker ID: '
.
$workerId
.
PHP_EOL
.
PHP_EOL
;
if
(
count
(
$argv
)
<
2
)
{
// 启动服务器后, 去掉下面这行注释, 然后 reload , 该语句也不会执行的
printf
(
"please input help parameter
\n
"
);
//echo ' reloaded ! ' . PHP_EOL . PHP_EOL;
exit
();
// 应该把这里的回调事件代码写在另一个文件中来 include 而不是直接写在这里
}
// 注意即便是 include_once , reload 也会重新加载的, 但在你的逻辑控制中是有效的
if
(
$argv
[
1
]
===
'stop'
)
{
});
// 每次连接时(相当于每个浏览器第一次打开页面时)执行一次, reload 时连接不会断开, 也就不会再次触发该事件
$http
->
on
(
'Connect'
,
function
(
swoole_http_server
$server
,
$fd
,
$reactorThreadId
)
{
echo
'Worker ID: '
.
$server
->
worker_id
.
';'
.
'fd: '
.
$fd
.
' , fromId: '
.
$reactorThreadId
.
PHP_EOL
;
});
// 浏览器连接服务器后, 页面上的每个请求均会执行一次,
// 每次打开链接页面默认都是接收两个请求, 一个是正常的数据请求, 一个 favicon.ico 的请求
$http
->
on
(
'request'
,
function
(
$request
,
$response
)
use
(
$http
)
{
require_once
APPLICATION_PATH
.
'/scripts/crontab/common.php'
;
require
APPLICATION_PATH
.
'/scripts/crontab/push/bridgeBase.php'
;
});
$http
->
start
();
$this
->
stop
();
}
else
if
(
$argv
[
1
]
===
'start'
)
{
$this
->
start
();
}
else
if
(
$argv
[
1
]
===
'restart'
){
$this
->
restart
();
}
else
if
(
$argv
[
1
]
===
'status'
){
$this
->
status
();
}
else
if
(
$argv
[
1
]
===
'reload'
){
$this
->
reload
();
}
else
{
$this
->
help
(
$argv
[
0
]);
}
}
}
$cgse
=
new
bridgeStart
();
$cgse
->
main
(
$argv
);
\ No newline at end of file
scripts/crontab/push/bridgeStart_bak.php
0 → 100644
View file @
83ed1083
<?php
define
(
"APPLICATION_PATH"
,
realpath
(
dirname
(
__FILE__
)
.
'/../../../'
));
//指向public的上一级
require
APPLICATION_PATH
.
'/scripts/crontab/baseCli.php'
;
require
APPLICATION_PATH
.
'/scripts/crontab/common.php'
;
error_reporting
(
E_ALL
^
E_NOTICE
);
class
bridgeStart
{
/* config */
const
LISTEN
=
"tcp://192.168.2.15:5555"
;
const
MAXCONN
=
100
;
const
pidfile
=
__CLASS__
;
const
uid
=
81
;
const
gid
=
81
;
/**/
protected
$pool
=
NULL
;
protected
$zmq
=
NULL
;
public
function
__construct
()
{
$this
->
pidfile
=
'/var/run/'
.
self
::
pidfile
.
'.pid'
;
}
private
function
daemon
()
{
if
(
file_exists
(
$this
->
pidfile
))
{
echo
"The file
$this->pidfile
exists.
\n
"
;
exit
();
}
$pid
=
pcntl_fork
();
if
(
$pid
==
-
1
)
{
die
(
'could not fork'
);
}
else
if
(
$pid
)
{
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit
(
$pid
);
}
else
{
// we are the child
file_put_contents
(
$this
->
pidfile
,
getmypid
());
posix_setuid
(
self
::
uid
);
posix_setgid
(
self
::
gid
);
return
(
getmypid
());
}
}
public
function
callback
(
$instance
,
$channelName
,
$message
){
$orderService
=
\Business\Order\OrderServiceModel
::
getInstance
();
$orderService
->
testPush
(
$message
);
}
protected
function
status
(){
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
printf
(
"%s already running, pid = %s
\n
"
,
$this
->
argv
[
0
],
$pid
);
}
else
{
printf
(
"%s haven't running
\n
"
,
$this
->
argv
[
0
]);
}
}
private
function
start
()
{
$pid
=
$this
->
daemon
();
$conf
=
\Yaf\Registry
::
get
(
'config'
)
->
get
(
'redis.database.params'
);
$http
=
new
swoole_http_server
(
"0.0.0.0"
,
9501
);
$http
->
on
(
'request'
,
function
(
$request
,
$response
)
{
try
{
$respData
=
$request
->
post
;
// echo json_encode($respData);
if
(
isset
(
$respData
[
'type'
])
&&
$respData
[
'type'
]
==
1
){
$info
=
$respData
[
'content'
];
$memberDao
=
$info
[
'className'
]
::
getInstance
(
\Our\DbNameConst
::
masterDBConnectName
);
echo
json_encode
(
$info
);
if
(
empty
(
$info
[
'params'
])){
$res
=
call_user_func_array
(
array
(
$memberDao
,
$info
[
'method'
]),
array
());
}
else
{
$res
=
call_user_func_array
(
array
(
$memberDao
,
$info
[
'method'
]),
$info
[
'params'
]);
}
$res
=
(
isset
(
$res
)
&&!
empty
(
$res
))
?
$res
:
false
;
unset
(
$memberDao
);
\Mysql\LinkMySQLModel
::
unsetDbConecet
();
if
(
$res
!==
false
){
echo
'success'
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
1
,
'message'
=>
'执行成功'
,
'data'
=>
$res
)));
}
else
{
echo
'fail1'
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
else
{
echo
'fail2'
;
$response
->
end
(
json_encode
(
array
(
'status'
=>
0
,
'message'
=>
'执行失败'
)));
}
}
catch
(
Exception
$ex
){
throw
new
Exception
(
$ex
->
getMessage
(),
$ex
->
getCode
());
}
});
$http
->
start
();
}
private
function
reload
(){
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
//posix_kill(posix_getpid(), SIGHUP);
posix_kill
(
$pid
,
SIGHUP
);
}
}
protected
function
restart
(){
$this
->
stop
();
sleep
(
1
);
$this
->
start
();
}
private
function
stop
()
{
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
posix_kill
(
$pid
,
9
);
unlink
(
$this
->
pidfile
);
}
}
private
function
help
(
$proc
)
{
printf
(
"%s start | stop | help | restart | reload
\n
"
,
$proc
);
}
private
$argv
;
public
function
main
(
$argv
)
{
$this
->
argv
=
$argv
;
if
(
count
(
$argv
)
<
2
)
{
printf
(
"please input help parameter
\n
"
);
exit
();
}
if
(
$argv
[
1
]
===
'stop'
)
{
$this
->
stop
();
}
else
if
(
$argv
[
1
]
===
'start'
)
{
$this
->
start
();
}
else
if
(
$argv
[
1
]
===
'restart'
){
$this
->
restart
();
}
else
if
(
$argv
[
1
]
===
'status'
){
$this
->
status
();
}
else
if
(
$argv
[
1
]
===
'reload'
){
$this
->
reload
();
}
else
{
$this
->
help
(
$argv
[
0
]);
}
}
}
$cgse
=
new
bridgeStart
();
$cgse
->
main
(
$argv
);
\ No newline at end of file
scripts/crontab/push/newsServer.php
0 → 100644
View file @
83ed1083
<?php
/**
*
* 脚本
*
* @author ccw <31435391@qq.com>
*/
define
(
"APPLICATION_PATH"
,
realpath
(
dirname
(
__FILE__
)
.
'/../../../'
));
//指向public的上一级
define
(
"KEY_REDIS_PRE"
,
"PHPREDIS_SESSION:"
);
define
(
"KEY_PRE"
,
"qm_:han_member:"
);
define
(
"KEY_PRE_STORE"
,
"qm_:han_store:"
);
require
APPLICATION_PATH
.
'/scripts/crontab/common.php'
;
header
(
"Access-Control-Allow-Origin: *"
);
header
(
'Access-Control-Allow-Method: *'
);
header
(
'Access-Control-Allow-Headers: x-requested-with,content-type'
);
header
(
'Content-type: application/json'
);
$conf
=
\Yaf\Registry
::
get
(
'config'
)
->
get
(
'redis.database.params'
);
$serv
=
new
swoole_websocket_server
(
"0.0.0.0"
,
9502
);
// swoole 监听一个TCP端口来处理第三方客户端的消息请求
$tcp_server
=
$serv
->
addlistener
(
'0.0.0.0'
,
9503
,
SWOOLE_SOCK_TCP
);
$tcp_server
->
set
(
array
(
'open_length_check'
=>
false
,
'package_length_type'
=>
'N'
,
'package_length_offset'
=>
0
,
'package_max_length'
=>
8192
,
'open_eof_check'
=>
false
,
'package_eof'
=>
"
\r\n
"
));
$serv
->
on
(
'Open'
,
function
(
$server
,
$req
)
use
(
$conf
){
$server
->
push
(
$req
->
fd
,
responseJson
(
1
,
"conection"
,
"连接成功"
,
[
'method'
=>
'connection'
,
'status'
=>
1
,
'error_code'
=>
0
]));
});
function
saveMessage
(){
}
$serv
->
on
(
'Message'
,
function
(
$server
,
$frame
)
use
(
$conf
){
if
(
$frame
->
data
==
'?'
){
$server
->
push
(
$frame
->
fd
,
'!'
);
}
else
{
// redis 没有设置密码
$redis
=
new
Redis
();
$redis
->
connect
(
$conf
[
'host'
],
$conf
[
'port'
]);
if
(
!
empty
(
$conf
[
'password'
])){
$redis
->
auth
(
$conf
[
'password'
]);
}
// echo $frame->data;
echo
$frame
->
data
;
echo
11111111111
;
$rev_data
=
json_decode
(
$frame
->
data
,
true
);
//echo json_encode($rev_data);
$method
=
isset
(
$rev_data
[
'method'
])
?
$rev_data
[
'method'
]
:
''
;
//echo $method;
$key
=
isset
(
$rev_data
[
'key'
])
?
$rev_data
[
'key'
]
:
''
;
$key
=
isset
(
$rev_data
[
'data'
][
'key'
])
?
$rev_data
[
'data'
][
'key'
]
:
$key
;
if
(
$method
==
'sendMsg'
){
$rev_data
=
$rev_data
[
'data'
];
$realKey
=
KEY_REDIS_PRE
.
$key
;
$sessData
=
$redis
->
get
(
$realKey
);
// $sessDataSpilt1=explode(';',$sessData);
// $sessDataSpilt2=explode('|',$sessDataSpilt1[1]);
// $sessDataSpilt3=explode(':', $sessDataSpilt2[1]);
// $res= $sessDataSpilt3[1];
$matchs
=
array
();
preg_match
(
"/member_id\|i:(\d+)/i"
,
$sessData
,
$matchs
);
$memberId
=
$matchs
[
1
];
$messageService
=
\Business\Message\MessageServiceModel
::
getInstance
();
$sendFd
=
$redis
->
hGet
(
KEY_PRE
.
$rev_data
[
'toId'
],
'fd'
);
$sendTime
=
$redis
->
hGet
(
KEY_PRE
.
$memberId
,
'sendTime'
);
$diffTime
=
time
()
-
$sendTime
;
if
(
$diffTime
==
0
){
$async_login_data
=
[
'method'
=>
'sendMsg'
,
'status'
=>
0
,
'message'
=>
'一秒钟只能发一次消息'
,
// eat drink play more and more
'data'
=>
array
(
'fd'
=>
$frame
->
fd
,
'method'
=>
'sendMsg'
,
)
];
$server
->
push
(
$frame
->
fd
,
responseJson
(
$async_login_data
[
'status'
],
$async_login_data
[
'method'
],
$async_login_data
[
'message'
],
$async_login_data
[
'data'
]));
}
else
{
if
(
!
empty
(
$memberId
)){
$sendMessage
[
'message'
][
'type'
]
=
$rev_data
[
'messageType'
];
$strId
=
time
()
.
(
string
)
$memberId
;
$sendMessage
[
'id'
]
=
(
int
)
$strId
;
$sendMessage
[
'message'
][
'title'
]
=
(
isset
(
$rev_data
[
'messageTitle'
])
&&!
empty
(
$rev_data
[
'messageTitle'
]))
?
$rev_data
[
'messageTitle'
]
:
''
;
$sendMessage
[
'message'
][
'content'
]
=
$rev_data
[
'messageContent'
];
$sendMessage
[
'message'
][
'url'
]
=
isset
(
$rev_data
[
'messageUrl'
])
?
$rev_data
[
'messageUrl'
]
:
''
;
$sendMessage
[
'message'
][
'href'
]
=
isset
(
$rev_data
[
'messageHref'
])
?
$rev_data
[
'messageHref'
]
:
''
;
$sendMessage
[
'message'
][
'price'
]
=
isset
(
$rev_data
[
'messagePrice'
])
?
$rev_data
[
'messagePrice'
]
:
''
;
$sendMessage
[
'message'
][
'thumb'
]
=
isset
(
$rev_data
[
'messageThumb'
])
?
$rev_data
[
'messageThumb'
]
:
''
;
$sendMessage
[
'message'
][
'gmtCreate'
]
=
time
();
$sendMessage
[
'fromId'
]
=
$memberId
;
$sendMessage
[
'fromUserId'
]
=
$rev_data
[
'fromUserId'
];
$sendMessage
[
'fromUserName'
]
=
$rev_data
[
'fromUserName'
];
$sendMessage
[
'fromAvatar'
]
=
$messageService
->
getAvatar
(
$rev_data
[
'fromType'
],
$rev_data
[
'fromUserId'
]);
$sendMessage
[
'fromType'
]
=
$rev_data
[
'fromType'
];
$sendMessage
[
'toId'
]
=
$rev_data
[
'toId'
];
$sendMessage
[
'toUserId'
]
=
$rev_data
[
'toUserId'
];
$sendMessage
[
'toUserName'
]
=
$rev_data
[
'toUserName'
];
$sendMessage
[
'toType'
]
=
$rev_data
[
'toType'
];
if
(
!
empty
(
$sendFd
)){
echo
'发送fd:'
.
$sendFd
;
if
(
$sendFd
!=
$frame
->
fd
){
$server
->
push
(
$sendFd
,
responseJson
(
1
,
"fromMsg"
,
"success"
,
$sendMessage
));
}
}
echo
'afterSendFd:'
.
$sendFd
;
}
//echo json_encode($sendMessage);
//$redis->rPush("message_center",serialize($sendMessage));
$redis
->
hSet
(
KEY_PRE
.
$memberId
,
'sendTime'
,
time
());
$server
->
push
(
$frame
->
fd
,
responseJson
(
1
,
"sendMsg"
,
"success"
,
$sendMessage
));
$messageService
->
addMessage
(
$sendMessage
[
'fromId'
],
$sendMessage
[
'fromUserId'
],
$sendMessage
[
'fromUserName'
],
$sendMessage
[
'fromType'
],
$sendMessage
[
'message'
],
$sendMessage
[
'toId'
],
$sendMessage
[
'toUserId'
],
$sendMessage
[
'toUserName'
],
$sendMessage
[
'toType'
],
$sendMessage
[
'id'
]);
$sendMessage
[
'message'
]
=
serialize
(
$sendMessage
[
'message'
]);
$redis
->
rPush
(
'push_center'
,
serialize
(
$sendMessage
));
$redis
->
hIncrBy
(
KEY_PRE
.
$rev_data
[
'toId'
],
'unread_message_count'
,
\Our\ApiConst
::
one
);
}
$redis
->
close
();
unset
(
$redis
);
unset
(
$memberDb0Redis
);
// $messageService->unsetDb();
unset
(
$messageService
);
\Mysql\LinkMySQLModel
::
unsetDbConecet
();
}
else
{
$realKey
=
KEY_REDIS_PRE
.
$key
;
$sessData
=
$redis
->
get
(
$realKey
);
// $sessDataSpilt1=explode(';',$sessData);
// $sessDataSpilt2=explode('|',$sessDataSpilt1[1]);
// $sessDataSpilt3=explode(':', $sessDataSpilt2[1]);
// $res= $sessDataSpilt3[1];
$res
=
$sessData
;
//echo $res;
$matchs
=
array
();
preg_match
(
"/member_id\|i:(\d+)/i"
,
$sessData
,
$matchs
);
//现在
$storeMatchs
=
array
();
preg_match
(
"/store_id\|i:(\d+)/i"
,
$sessData
,
$storeMatchs
);
//现在
// preg_match("/member_id\|i[^\d]{1,3}(\d+)/i",$res,$matchs);过去
$memberId
=
$matchs
[
1
];
if
(
isset
(
$storeMatchs
[
1
])
&&
$storeMatchs
[
1
]
>
0
){
$storeId
=
$storeMatchs
[
1
];
}
else
{
$storeId
=
0
;
}
echo
"success:memberid="
.
$memberId
.
'login'
.
"
\n
"
;
$redis
->
expire
(
$realKey
,
\Our\ApiConst
::
tenDaySecond
);
if
(
$memberId
==
(
int
)
$memberId
){
switch
(
$method
)
{
case
'join'
:
$async_login_data
=
[
'method'
=>
'login'
,
'status'
=>
1
,
'message'
=>
"用户登录成功"
,
// eat drink play more and more
'data'
=>
array
(
'fd'
=>
$frame
->
fd
,
'method'
=>
$method
,
)
];
echo
'set:'
.
KEY_PRE
.
$memberId
,
'fd:'
.
$frame
->
fd
.
"
\r\n
"
;
$redis
->
hSet
(
KEY_PRE
.
$memberId
,
'fd'
,
$frame
->
fd
);
if
(
$storeId
){
echo
KEY_PRE_STORE
.
$storeId
.
'fd='
.
$frame
->
fd
;
$redis
->
hSet
(
KEY_PRE_STORE
.
$storeId
,
'fd'
,
$frame
->
fd
);
}
$server
->
push
(
$frame
->
fd
,
responseJson
(
$async_login_data
[
'status'
],
$async_login_data
[
'method'
],
$async_login_data
[
'message'
],
$async_login_data
[
'data'
]));
$redis
->
close
();
break
;
default
:
break
;
}
}
else
{
$async_login_data
=
[
'method'
=>
'login'
,
'status'
=>
0
,
'message'
=>
"用户授权失败"
,
// eat drink play more and more
'data'
=>
array
(
'fd'
=>
$frame
->
fd
,
'method'
=>
$method
,
)
];
$server
->
push
(
$frame
->
fd
,
responseJson
(
$async_login_data
[
'status'
],
$async_login_data
[
'method'
],
$async_login_data
[
'message'
],
$async_login_data
[
'data'
]));
$server
->
close
(
$frame
->
fd
);
}
}
}
});
/**
* 接受 client 端信息推送,涉及问题
* 1. 服务端认证、授权(安全)
* 2. 推送消息落地(redis队列->MySQL)
* 3. 推送客户端和消息中心交互, TCP
* 4. 按照用户类型(channel)推送
*/
$tcp_server
->
on
(
'connect'
,
function
(
$serv
,
$fd
)
use
(
$conf
){
echo
"Client:Connect
\r\n
"
;
});
/**
* 接受 client 端信息推送,涉及问题
* 1. 服务端认证、授权(安全)
* 2. 推送消息落地(redis队列->MySQL)
* 3. 推送客户端和消息中心交互, TCP
* 4. 按照用户类型(channel)推送
*/
$tcp_server
->
on
(
'receive'
,
function
(
$serv
,
$fd
,
$from_id
,
$data
)
use
(
$conf
)
{
// echo $data;
//$data = json_decode($data, true);
// if(empty($data['data']) && !isset($data['data'])){
// $serv->send($fd, responseJson(1,"fail", ['method' => 'receive', 'error_code' => 1, 'status' => 0]));
// $serv->close($fd);
// return;
// }
if
(
empty
(
$data
)
&&
!
isset
(
$data
)){
$serv
->
send
(
$fd
,
responseJson
(
1
,
"fail"
,
[
'method'
=>
'receive'
,
'error_code'
=>
1
,
'status'
=>
0
]));
$serv
->
close
(
$fd
);
return
;
}
// $s = json_encode($data['data']);
echo
$data
;
// 推送 存入redis、最后入库(MySQL)
$redis
=
new
Redis
();
$redis
->
connect
(
$conf
[
'host'
],
$conf
[
'port'
]);
if
(
!
empty
(
$conf
[
'password'
])){
$redis
->
auth
(
$conf
[
'password'
]);
}
$key
=
trim
(
$data
,
"
\r\n
"
);
$data
=
$redis
->
get
(
$key
);
$redis
->
delete
(
$key
);
$data
=
json_decode
(
$data
,
true
);
if
(
true
)
{
$serv
->
send
(
$fd
,
responseJson
(
1
,
"success"
,
[
'method'
=>
'receive'
,
'error_code'
=>
0
,
'status'
=>
1
]));
$serv
->
close
(
$fd
);
if
(
!
isset
(
$data
[
'uid'
])
||
empty
(
$data
[
'uid'
])){
foreach
(
$data
[
'data'
]
as
$value
){
if
(
isset
(
$value
[
'type'
])
&&
$value
[
'type'
]
>
100
){
$sendStoreFd
=
$redis
->
hGet
(
KEY_PRE_STORE
.
$value
[
'storeId'
],
'fd'
);
if
(
!
empty
(
$sendStoreFd
)){
if
(
!
$value
[
'message'
]){
$value
[
'message'
]
=
new
\stdClass
();
}
$serv
->
push
(
$sendStoreFd
,
responseJson
(
1
,
"fromMsg"
,
"success"
,
$value
));
}
}
else
{
$async_data
=
$redis
->
rPush
(
"message_center"
,
serialize
(
$value
));
if
(
$async_data
){
$value
[
'message'
]
=
unserialize
(
$value
[
'message'
]);
$sendFd
=
$redis
->
hGet
(
KEY_PRE
.
$value
[
'toId'
],
'fd'
);
$redis
->
hIncrBy
(
KEY_PRE
.
$value
[
'toId'
],
'unread_message_count'
,
\Our\ApiConst
::
one
);
if
(
!
empty
(
$sendFd
)){
if
(
!
$value
[
'message'
]){
$value
[
'message'
]
=
new
\stdClass
();
}
$serv
->
push
(
$sendFd
,
responseJson
(
1
,
"fromMsg"
,
"success"
,
$value
));
$redis
->
hSet
(
KEY_PRE
.
$value
[
'fromId'
],
'sendTime'
,
time
());
}
}
}
}
}
else
{
$async_data
=
$redis
->
rPush
(
"message_center"
,
serialize
(
$data
[
'data'
]));
if
(
$async_data
){
$sendFd
=
$redis
->
hGet
(
KEY_PRE
.
$data
[
'data'
][
'toId'
],
'fd'
);
$data
[
'data'
][
'message'
]
=
unserialize
(
$data
[
'data'
][
'message'
]);
$redis
->
hIncrBy
(
KEY_PRE
.
$data
[
'data'
][
'toId'
],
'unread_message_count'
,
\Our\ApiConst
::
one
);
if
(
!
empty
(
$sendFd
)){
if
(
!
$data
[
'data'
][
'message'
]){
$data
[
'data'
][
'message'
]
=
new
\stdClass
();
}
$serv
->
push
(
$sendFd
,
responseJson
(
$sendFd
,
"fromMsg"
,
"success"
,
$data
[
'data'
]));
$redis
->
hSet
(
KEY_PRE
.
$data
[
'data'
][
'fromId'
],
'sendTime'
,
time
());
}
}
}
}
else
{
$serv
->
send
(
$fd
,
responseJson
(
1
,
'fromMsg'
,
"fail"
,
[
'method'
=>
'receive'
,
'error_code'
=>
110
,
'status'
=>
0
]));
}
$redis
->
close
();
unset
(
$redis
);
});
$tcp_server
->
on
(
'close'
,
function
(
$serv
,
$fd
)
{
echo
"Client: Close.
\n
"
;
});
$serv
->
on
(
'Close'
,
function
(
$server
,
$fd
)
use
(
$conf
){
$key
=
'user_'
.
$fd
;
$redis
=
new
Redis
();
$redis
->
connect
(
$conf
[
'host'
],
$conf
[
'port'
]);
if
(
!
empty
(
$conf
[
'password'
])){
$redis
->
auth
(
$conf
[
'password'
]);
}
$redis
->
hDel
(
"client_list"
,
$key
);
echo
"connection close: "
.
$fd
;
});
function
responseJson
(
$status
=
1
,
$method
,
$message
=
''
,
$data
=
array
())
{
if
(
!
$data
[
'message'
]){
$data
[
'message'
]
=
new
\stdClass
();
}
$data
=
[
'status'
=>
$status
,
'method'
=>
$method
,
'message'
=>
$message
,
'data'
=>
$data
,
];
echo
json_encode
(
$data
);
return
json_encode
(
$data
);
}
$serv
->
start
();
scripts/crontab/push/newsServerBaseMessage.php
0 → 100644
View file @
83ed1083
<?php
/**
* Created by PhpStorm.
* User: Administrator
* Date: 2019/1/18 0018
* Time: 下午 2:41
*/
\ No newline at end of file
scripts/crontab/push/runRedisBase.php
0 → 100644
View file @
83ed1083
<?php
/**
* Created by PhpStorm.
* User: Administrator
* Date: 2019/1/17 0017
* Time: 下午 2:29
*/
require
APPLICATION_PATH
.
'/scripts/crontab/common.php'
;
error_reporting
(
E_ALL
^
E_NOTICE
);
$conf
=
\Yaf\Registry
::
get
(
'config'
)
->
get
(
'redis.database.params'
);
$redisPublishName
=
\Yaf\Registry
::
get
(
'config'
)
->
get
(
'redis.redisPublishName'
);
function
callback
(
$instance
,
$channelName
,
$message
){
try
{
$message
=
json_decode
(
$message
,
true
);
$callInstance
=
$message
[
'className'
]
::
getInstance
(
\Our\DbNameConst
::
masterDBConnectName
);
echo
json_encode
(
$message
)
.
"
\r\n
"
;
if
(
empty
(
$message
[
'params'
])){
$res
=
call_user_func_array
(
array
(
$callInstance
,
$message
[
'method'
]),
array
());
}
else
{
$res
=
call_user_func_array
(
array
(
$callInstance
,
$message
[
'method'
]),
$message
[
'params'
]);
}
$res
=
(
isset
(
$res
)
&&!
empty
(
$res
))
?
$res
:
false
;
if
(
$res
!==
false
){
echo
'success'
.
"
\r\n
"
;
}
else
{
echo
'fail1'
.
"
\r\n
"
;
}
unset
(
$callInstance
);
\Mysql\LinkMySQLModel
::
unsetDbConecet
();
}
catch
(
Exception
$ex
){
echo
$ex
->
getMessage
();
}
$instance
->
unsubscribe
(
array
(
$channelName
));
$instance
->
close
();
}
$redis
=
new
Redis
();
$redis
->
connect
(
$conf
[
'host'
],
$conf
[
'port'
]);
if
(
!
empty
(
$conf
[
'password'
])){
$redis
->
auth
(
$conf
[
'password'
]);
}
//ini_set('default_socket_timeout', -1);(所有长连接不超时)
$redis
->
setOption
(
Redis
::
OPT_READ_TIMEOUT
,
-
1
);
$result
=
$redis
->
subscribe
(
array
(
$redisPublishName
),
'callback'
);
$redis
->
close
();
echo
'abcdef'
.
"
\r\n
"
;
\ No newline at end of file
scripts/crontab/push/runRedisCanReload.php
0 → 100644
View file @
83ed1083
<?php
define
(
"APPLICATION_PATH"
,
realpath
(
dirname
(
__FILE__
)
.
'/../../../'
));
//指向public的上一级
class
Logger
{
public
function
__construct
(
/*Logging $logger*/
)
{
}
public
function
logger
(
$type
,
$message
)
{
$log
=
sprintf
(
"%s
\t
%s
\t
%s
\n
"
,
date
(
'Y-m-d H:i:s'
),
$type
,
$message
);
file_put_contents
(
sprintf
(
__DIR__
.
"/../log/sender.%s.log"
,
date
(
'Y-m-d'
)),
$log
,
FILE_APPEND
);
}
}
final
class
Signal
{
public
static
$signo
=
0
;
protected
static
$ini
=
null
;
public
static
function
set
(
$signo
){
self
::
$signo
=
$signo
;
}
public
static
function
get
(){
return
(
self
::
$signo
);
}
public
static
function
reset
(){
self
::
$signo
=
0
;
}
}
class
Test
extends
Logger
{
//public static $signal = null;
public
function
__construct
()
{
//self::$signal == null;
}
public
function
run
()
{
require
APPLICATION_PATH
.
'/scripts/crontab/push/runRedisBase.php'
;
pcntl_signal_dispatch
();
printf
(
".123"
);
sleep
(
1
);
if
(
Signal
::
get
()
==
SIGHUP
)
{
Signal
::
reset
();
// break;
}
// while (true) {
// pcntl_signal_dispatch();
// printf(".123");
// sleep(1);
// if (Signal::get() == SIGHUP) {
// Signal::reset();
// break;
// }
//
// }
printf
(
"
\n
"
);
}
}
class
Daemon
extends
Logger
{
/* config */
const
LISTEN
=
"tcp://192.168.2.15:5555"
;
const
pidfile
=
__CLASS__
;
const
uid
=
80
;
const
gid
=
80
;
const
sleep
=
5
;
protected
$pool
=
NULL
;
protected
$config
=
array
();
public
function
__construct
(
$uid
,
$gid
,
$class
)
{
$this
->
pidfile
=
'/var/run/'
.
basename
(
get_class
(
$class
),
'.php'
)
.
'.pid'
;
//$this->config = parse_ini_file('sender.ini', true); //include_once(__DIR__."/config.php");
$this
->
uid
=
$uid
;
$this
->
gid
=
$gid
;
$this
->
class
=
$class
;
$this
->
classname
=
get_class
(
$class
);
$this
->
signal
();
}
public
function
signal
(){
pcntl_signal
(
SIGHUP
,
function
(
$signo
)
/*use ()*/
{
//echo "\n This signal is called. [$signo] \n";
printf
(
"The process has been reload.
\n
"
);
Signal
::
set
(
$signo
);
$this
->
run
();
});
}
private
function
daemon
(){
if
(
file_exists
(
$this
->
pidfile
))
{
echo
"The file
$this->pidfile
exists.
\n
"
;
exit
();
}
$pid
=
pcntl_fork
();
if
(
$pid
==
-
1
)
{
die
(
'could not fork'
);
}
else
if
(
$pid
)
{
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit
(
$pid
);
}
else
{
file_put_contents
(
$this
->
pidfile
,
getmypid
());
posix_setuid
(
self
::
uid
);
posix_setgid
(
self
::
gid
);
return
(
getmypid
());
}
}
private
function
run
(){
while
(
true
){
printf
(
"The process begin.
\n
"
);
$this
->
class
->
run
();
printf
(
"The process end.
\n
"
);
}
}
private
function
foreground
(){
$this
->
run
();
}
private
function
start
(){
$pid
=
$this
->
daemon
();
for
(;;){
$this
->
run
();
sleep
(
self
::
sleep
);
}
}
private
function
stop
(){
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
posix_kill
(
$pid
,
9
);
unlink
(
$this
->
pidfile
);
}
}
private
function
reload
(){
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
//posix_kill(posix_getpid(), SIGHUP);
posix_kill
(
$pid
,
SIGHUP
);
}
}
private
function
status
(){
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
system
(
sprintf
(
"ps ax | grep %s | grep -v grep"
,
$pid
));
}
}
private
function
help
(
$proc
){
printf
(
"%s start | stop | restart | status | foreground | help
\n
"
,
$proc
);
}
public
function
main
(
$argv
){
if
(
count
(
$argv
)
<
2
){
$this
->
help
(
$argv
[
0
]);
printf
(
"please input help parameter
\n
"
);
exit
();
}
if
(
$argv
[
1
]
===
'stop'
){
$this
->
stop
();
}
else
if
(
$argv
[
1
]
===
'start'
){
$this
->
start
();
}
else
if
(
$argv
[
1
]
===
'restart'
){
$this
->
stop
();
$this
->
start
();
}
else
if
(
$argv
[
1
]
===
'status'
){
$this
->
status
();
}
else
if
(
$argv
[
1
]
===
'foreground'
){
$this
->
foreground
();
}
else
if
(
$argv
[
1
]
===
'reload'
){
$this
->
reload
();
}
else
{
$this
->
help
(
$argv
[
0
]);
}
}
}
$daemon
=
new
Daemon
(
80
,
80
,
new
Test
());
$daemon
->
main
(
$argv
);
scripts/crontab/push/runRedisPush.php
View file @
83ed1083
...
@@ -92,8 +92,9 @@ class runRedisPush
...
@@ -92,8 +92,9 @@ class runRedisPush
private
function
reload
(){
private
function
reload
(){
if
(
file_exists
(
$this
->
pidfile
))
{
if
(
file_exists
(
$this
->
pidfile
))
{
$pid
=
file_get_contents
(
$this
->
pidfile
);
$pid
=
file_get_contents
(
$this
->
pidfile
);
posix_kill
(
$pid
,
SIGUSR1
);
//posix_kill(posix_getpid(), SIGHUP);
//posix_kill(posix_getpid(), SIGHUP);
posix_kill
(
$pid
,
SIGHUP
);
//
posix_kill($pid, SIGHUP);
}
}
}
}
protected
function
restart
(){
protected
function
restart
(){
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment