count = 1; $worker->onWorkerStart = function($worker){ // 每2.5秒执行一次 // $time_interval = 2; // Timer::add($time_interval, function () { // echo "task 1min run\n"; // }); Gateway::$registerAddress = '127.0.0.1:1238'; // $con = new AsyncTcpConnection('ws://api.btcgateway.pro/swap-ws'); $con = new AsyncTcpConnection('ws://fstream.binance.com:443/ws'); // $symbols = \App\Models\ContractPair::query()->where('status',1)->pluck('symbol'); // foreach ($symbols as $symbol){ // $symbol = strtolower($symbol . 'USDT')."@depth20@100ms"; // $link = 'ws://fstream.binance.com/ws/'.$symbol; // // var_dump($link); // $con = new AsyncTcpConnection($link); // } // 设置以ssl加密方式访问,使之成为wss $con->transport = 'ssl'; $con->onConnect = function($con) { //所有交易对 $symbols = \App\Models\ContractPair::query()->where('status',1)->pluck('symbol'); $msg1 =null; foreach ($symbols as $symbol){ // $symbol = strtolower($symbol . 'USDT')."@depth20"; //买卖盘深度数据 // $msg1 = ["method"=>"SUBSCRIBE","params"=> [$symbol], "id"=> rand(100000,999999) . time()]; // echo json_encode($msg1) . "\r\n"; // $con->send(json_encode($msg1)); $symbol = strtolower($symbol); $symbol_list[] = $symbol."usdt@depth10"; } // $msg1 = rtrim($msg1,","); // $msg1[] = $msg1; $msg1 = ["method" =>"SUBSCRIBE","params"=>$symbol_list,"id"=> intval(rand(100000,999999) . time())]; $send = json_encode($msg1); $con->send($send); }; $con->onMessage = function($con, $data) { // exit(); // $data = json_decode(gzdecode($data),true); // echo json_encode($data) . "\r\n"; $data = json_decode($data,true); if(isset($data['ping'])){ $msg = ["pong" => $data['ping']]; $con->send(json_encode($msg)); } if(isset($data['u'])){ $ch = $data['u']; // $pattern_depth = '/^market\.(.*?)\.depth\.step6$/'; //深度 $pattern_depth = 'depthUpdate'; if ($pattern_depth){ //深度数据 $symbol = $data['s']; $symbol = str_before($symbol,'USDT'); // var_dump($symbol); // 获取风控任务 $risk_key = 'fkJson:' . $symbol . '/USDT'; $risk = json_decode( Redis::get($risk_key) ,true); $minUnit = $risk['minUnit'] ?? 0; $count = $risk['count'] ?? 0; $enabled = $risk['enabled'] ?? 0; $buyList = $data['b'] ?? []; $cacheBuyList = []; foreach ($buyList as $key1 => $item1){ $cacheBuyList[$key1]['id'] = Str::uuid()->toString(); // $buy_amount = intval(substr($item1[1],-2)); $buy_amount = $item1[1]; if($buy_amount == 0){ $buy_amount = 1; } $cacheBuyList[$key1]['amount'] = $buy_amount; if(!blank($risk) && $enabled == 1){ // 修改买盘价格 $original_price = $item1[0]; $tmp = explode('.',$original_price); if(sizeof($tmp) > 1){ $size = strlen(end($tmp)); }else{ $size = 0; } $change = $minUnit * $count; $cacheBuyList[$key1]['price'] = PriceCalculate($original_price ,'+', $change,8); }else{ $cacheBuyList[$key1]['price'] = $item1[0]; } } $sellList = $data['a'] ?? []; $cacheSellList = []; foreach ($sellList as $key2 => $item2){ $cacheSellList[$key2]['id'] = Str::uuid()->toString(); // $sell_amount = intval(substr($item2[1],-2)); $sell_amount = $item2[1]; if($sell_amount == 0){ $sell_amount = 1; } $cacheSellList[$key2]['amount'] = $sell_amount; if(!blank($risk) && $enabled == 1){ // 修改卖盘价格 $original_price = $item2[0]; $tmp = explode('.',$original_price); if(sizeof($tmp) > 1){ $size = strlen(end($tmp)); }else{ $size = 0; } $change = $minUnit * $count; $cacheSellList[$key2]['price'] = PriceCalculate($original_price ,'+', $change,8); }else{ $cacheSellList[$key2]['price'] = $item2[0]; } } Cache::store('redis')->put('swap:' . $symbol . '_depth_buy',$cacheBuyList); Cache::store('redis')->put('swap:' . $symbol . '_depth_sell',$cacheSellList); if($swap_buy = Cache::store('redis')->get('swap_buyList_' . $symbol)){ Cache::store('redis')->forget('swap_buyList_' . $symbol); array_unshift($cacheBuyList,$swap_buy); } if($swap_sell = Cache::store('redis')->get('swap_sellList_' . $symbol)){ Cache::store('redis')->forget('swap_sellList_' . $symbol); array_unshift($cacheSellList,$swap_sell); } $group_id1 = 'swapBuyList_' . $symbol; $group_id2 = 'swapSellList_' . $symbol; if(Gateway::getClientIdCountByGroup($group_id1) > 0){ Gateway::sendToGroup($group_id1, json_encode(['code'=>0,'msg'=>'success','data'=>$cacheBuyList,'sub'=>$group_id1])); Gateway::sendToGroup($group_id2, json_encode(['code'=>0,'msg'=>'success','data'=>$cacheSellList,'sub'=>$group_id2])); } } } }; $con->onClose = function ($con) { //这个是延迟断线重连,当服务端那边出现不确定因素,比如宕机,那么相对应的socket客户端这边也链接不上,那么可以吧1改成适当值,则会在多少秒内重新,我也是1,也就是断线1秒重新链接 $con->reConnect(1); }; $con->onError = function ($con, $code, $msg) { echo "error $code $msg\n"; }; $con->connect(); }; Worker::runAll();