count = 1; $worker->onWorkerStart = function ($worker) { Gateway::$registerAddress = '127.0.0.1:1238'; // $con = new AsyncTcpConnection('ws://api.btcgateway.pro/swap-ws'); // $con = new AsyncTcpConnection('ws://api.btcgateway.pro/linear-swap-ws'); $con = new AsyncTcpConnection('ws://fstream.binance.com:443/ws'); // 设置以ssl加密方式访问,使之成为wss $con->transport = 'ssl'; $con->onConnect = function ($con) { //所有交易对 $symbols = \App\Models\ContractPair::query()->where('status', 1)->pluck('symbol'); $airCoins = \App\Models\Klike::query()->pluck('like_coin'); $arr1 = array_collapse([$symbols,$airCoins]); $symbols = array_values(array_unique($arr1)); // \Log::info(json_encode($symbols)); foreach ($symbols as $symbol) { $symbol = strtolower($symbol); $symbol_list[] = $symbol . "usdt@ticker"; } $msg1 = ["method" => "SUBSCRIBE", "params" => $symbol_list, "id" => intval(rand(100000, 999999) . time())]; $send = json_encode($msg1); // echo $send; $con->send($send); }; $con->onMessage = function ($con, $data) { // echo $data . "\r\n"; // $data = json_decode(gzdecode($data),true); $data = json_decode($data, true); // echo json_encode($data) . "\r\n"; if (isset($data['ping'])) { $msg = ["pong" => $data['ping']]; $con->send(json_encode($msg)); } if (isset($data['s'])) { $ch = $data['s']; // $pattern_detail = '/^market\.(.*?)\.detail$/'; //市场概要 $ch = $data['s']; $symbol = str_before($ch, 'USDT'); if ($symbol) { if ($data['e'] != '24hrMiniTicker') { //市场概况 $cache_data = [ "id" => $data['E'], "mrid" => $data['E'], "open" => $data['o'], "close" => $data['c'], "high" => $data['h'], "low" => $data['l'], "amount" => $data['q'], 'vol' => $data['v'], 'trade_turnover' => $data['F'], 'count' => $data['n'], 'increase' => $data['p'], 'increaseStr' => $data['P'] ]; // 获取风控任务 $risk_key = 'fkJson:' . $symbol . '/USDT'; $risk = json_decode(Redis::get($risk_key), true); $minUnit = $risk['minUnit'] ?? 0; $count = $risk['count'] ?? 0; $enabled = $risk['enabled'] ?? 0; if (!blank($risk) && $enabled == 1) { $change = $minUnit * $count; $cache_data['close'] = PriceCalculate($cache_data['close'], '+', $change, 8); $cache_data['open'] = PriceCalculate($cache_data['open'], '+', $change, 8); $cache_data['high'] = PriceCalculate($cache_data['high'], '+', $change, 8); $cache_data['low'] = PriceCalculate($cache_data['low'], '+', $change, 8); } if (isset($cache_data['open']) && $cache_data['open'] != 0) { // 获取1dayK线 计算$increase $day_kline = Cache::store('redis')->get('swap:' . $symbol . '_kline_' . '1day'); if (blank($day_kline)) { $increase = PriceCalculate(($cache_data['close'] - $cache_data['open']), '/', $cache_data['open'], 4); } else { $increase = PriceCalculate(($cache_data['close'] - $day_kline['open']), '/', $day_kline['open'], 4); } } else { $increase = 0; } $cache_data['increase'] = $increase; $flag = $increase >= 0 ? '+' : ''; $cache_data['increaseStr'] = $increase == 0 ? '+0.00%' : $flag . $increase * 100 . '%'; $key = 'swap:' . $symbol . '_detail'; Cache::store('redis')->put($key, $cache_data); Cache::store('redis')->put($key.'_new', $cache_data); } } $airCoins = \App\Models\Klike::query()->where('like_coin',$symbol)->get()->toArray(); // \Log::info(json_encode($airCoins)); foreach($airCoins as $item){ if($item['like_coin'] == $symbol){ $symbolNew = $item['air_coin']; // \Log::info($symbolNew); if($item['calculation'] == '*'){ // $cache_data['price'] = bcmul($cache_data['price'],$item['percent']); $data['o'] = $data['o'] * $item['percent']; $data['c'] = $data['c'] * $item['percent']; $data['h'] = $data['h'] * $item['percent']; $data['l'] = $data['l'] * $item['percent']; // $data['o'] = $data['o'] * $item['percent']; // $data['o'] = $data['o'] * $item['percent']; } if($item['calculation'] == '/'){ // $cache_data['price'] = bcdiv($cache_data['price'],$item['percent']); $data['o'] = $data['o'] / $item['percent']; $data['c'] = $data['c'] / $item['percent']; $data['h'] = $data['h'] / $item['percent']; $data['l'] = $data['l'] / $item['percent']; // $data['o'] = $data['o'] * $item['percent']; // $data['o'] = $data['o'] * $item['percent']; } } if ($symbolNew) { if ($data['e'] != '24hrMiniTicker') { //市场概况 $cache_data = [ "id" => $data['E'], "mrid" => $data['E'], "open" => $data['o'], "close" => $data['c'], "high" => $data['h'], "low" => $data['l'], "amount" => $data['q'], 'vol' => $data['v'], 'trade_turnover' => $data['F'], 'count' => $data['n'], 'increase' => $data['p'], 'increaseStr' => $data['P'] ]; // \Log::info(json_encode($cache_data).'-----aaaaaaa'); // 获取风控任务 $risk_key = 'fkJson:' . $symbolNew . '/USDT'; $risk = json_decode(Redis::get($risk_key), true); $minUnit = $risk['minUnit'] ?? 0; $count = $risk['count'] ?? 0; $enabled = $risk['enabled'] ?? 0; if (!blank($risk) && $enabled == 1) { $change = $minUnit * $count; $cache_data['close'] = PriceCalculate($cache_data['close'], '+', $change, 8); $cache_data['open'] = PriceCalculate($cache_data['open'], '+', $change, 8); $cache_data['high'] = PriceCalculate($cache_data['high'], '+', $change, 8); $cache_data['low'] = PriceCalculate($cache_data['low'], '+', $change, 8); } if (isset($cache_data['open']) && $cache_data['open'] != 0) { // 获取1dayK线 计算$increase $day_kline = Cache::store('redis')->get('swap:' . $symbolNew . '_kline_' . '1day'); if (blank($day_kline)) { $increase = PriceCalculate(($cache_data['close'] - $cache_data['open']), '/', $cache_data['open'], 4); } else { // \Log::info('rrrrrrrrrrrrrrrr'); $increase = PriceCalculate(($cache_data['close'] - $day_kline['open']), '/', $day_kline['open'], 4); } } else { $increase = 0; } // \Log::info('aaaaaaaaaaa'.$cache_data['close'].'wwwwwwwwwww'.$day_kline['open']); $cache_data['close'] = number_format($cache_data['close'],$item['point'], '.', ''); $cache_data['open'] = number_format($cache_data['open'],$item['point'], '.', ''); $cache_data['high'] = number_format($cache_data['high'],$item['point'], '.', ''); $cache_data['low'] = number_format($cache_data['low'],$item['point'], '.', ''); $cache_data['increase'] = $increase; $flag = $increase >= 0 ? '+' : ''; $cache_data['increaseStr'] = $increase == 0 ? '+0.00%' : $flag . $increase * 100 . '%'; $key = 'swap:' . $symbolNew . '_detail'; Cache::store('redis')->put($key, $cache_data); Cache::store('redis')->put($key.'_new', $cache_data); } } } } }; $con->onClose = function ($con) { //这个是延迟断线重连,当服务端那边出现不确定因素,比如宕机,那么相对应的socket客户端这边也链接不上,那么可以吧1改成适当值,则会在多少秒内重新,我也是1,也就是断线1秒重新链接 $con->reConnect(1); }; $con->onError = function ($con, $code, $msg) { echo "error $code $msg\n"; }; $con->connect(); }; Worker::runAll();