自动更新管控端
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

296 lines
8.8 KiB

/*
* rpc_server.c
* RPC服务器实现,处理客户端连接和执行远程函数调用
*/
#include "rpc_common.h"
#include "rpc_transport.h"
#include "rpc_message.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
/* 函数处理函数类型定义 */
typedef int (*rpc_handler_func_t)(rpc_param_t* params, int args_count, rpc_param_t* return_value);
/* 函数处理器结构 */
typedef struct {
char func_name[MAX_FUNC_NAME_LEN];
rpc_handler_func_t handler;
rpc_param_type_t return_type;
rpc_param_type_t param_types[MAX_ARGS_COUNT];
int param_count;
} rpc_handler_t;
/* 函数处理器表 */
static rpc_handler_t handlers[MAX_FUNC_NAME_LEN];
static int handler_count = 0;
/*
* 注册远程函数
*/
int rpc_register_function(const char* func_name, rpc_handler_func_t handler,
rpc_param_type_t return_type,
rpc_param_type_t* param_types, int param_count) {
if (!func_name || !handler || param_count < 0 || param_count > MAX_ARGS_COUNT) {
return RPC_INVALID_ARGS;
}
if (handler_count >= MAX_FUNC_NAME_LEN) {
return RPC_ERROR;
}
// 检查函数名是否已存在
for (int i = 0; i < handler_count; i++) {
if (strcmp(handlers[i].func_name, func_name) == 0) {
return RPC_ERROR;
}
}
// 添加新的函数处理器
strncpy(handlers[handler_count].func_name, func_name, MAX_FUNC_NAME_LEN - 1);
handlers[handler_count].func_name[MAX_FUNC_NAME_LEN - 1] = '\0';
handlers[handler_count].handler = handler;
handlers[handler_count].return_type = return_type;
handlers[handler_count].param_count = param_count;
// 复制参数类型
if (param_count > 0 && param_types) {
memcpy(handlers[handler_count].param_types, param_types,
sizeof(rpc_param_type_t) * param_count);
}
handler_count++;
printf("Registered function: %s\n", func_name);
return RPC_SUCCESS;
}
/*
* 查找函数处理器
*/
rpc_handler_t* rpc_find_handler(const char* func_name) {
for (int i = 0; i < handler_count; i++) {
if (strcmp(handlers[i].func_name, func_name) == 0) {
return &handlers[i];
}
}
return NULL;
}
/*
* 处理客户端请求
*/
#ifdef _WIN32
DWORD WINAPI handle_client(LPVOID arg) {
#else
void* handle_client(void* arg) {
#endif
rpc_transport_t* transport = (rpc_transport_t*)arg;
rpc_request_t request;
rpc_response_t response;
int ret;
while (1) {
// 接收请求
ret = rpc_recv_request(transport, &request);
if (ret != RPC_SUCCESS) {
printf("Failed to receive request: %s\n", rpc_error_to_string(ret));
break;
}
printf("Received request: %s with %d arguments\n", request.func_name, request.args_count);
// 查找函数处理器
rpc_handler_t* handler = rpc_find_handler(request.func_name);
if (!handler) {
printf("Function not found: %s\n", request.func_name);
response.result_code = RPC_FUNC_NOT_FOUND;
response.return_type = RPC_TYPE_VOID;
rpc_send_response(transport, &response);
rpc_free_request(&request);
continue;
}
// 检查参数数量
if (request.args_count != handler->param_count) {
printf("Invalid argument count for %s\n", request.func_name);
response.result_code = RPC_INVALID_ARGS;
response.return_type = RPC_TYPE_VOID;
rpc_send_response(transport, &response);
rpc_free_request(&request);
continue;
}
// 执行函数
rpc_param_t return_value;
rpc_init_param(&return_value, handler->return_type);
ret = handler->handler(request.args, request.args_count, &return_value);
// 设置响应
response.result_code = ret;
response.return_type = handler->return_type;
// 复制返回值
switch (handler->return_type) {
case RPC_TYPE_INT:
response.return_value.int_val = return_value.value.int_val;
break;
case RPC_TYPE_FLOAT:
response.return_value.float_val = return_value.value.float_val;
break;
case RPC_TYPE_DOUBLE:
response.return_value.double_val = return_value.value.double_val;
break;
case RPC_TYPE_STRING:
// 注意:这里需要复制字符串,避免内存问题
response.return_value.string_val = strdup(return_value.value.string_val);
break;
case RPC_TYPE_BOOL:
response.return_value.bool_val = return_value.value.bool_val;
break;
case RPC_TYPE_VOID:
default:
break;
}
// 发送响应
rpc_send_response(transport, &response);
// 释放资源
rpc_free_request(&request);
if (handler->return_type == RPC_TYPE_STRING && return_value.value.string_val) {
free((void*)return_value.value.string_val);
}
if (response.return_type == RPC_TYPE_STRING && response.return_value.string_val) {
free((void*)response.return_value.string_val);
}
}
// 关闭连接
rpc_transport_close(transport);
free(transport);
#ifdef _WIN32
return 0;
#else
return NULL;
#endif
}
/*
* 示例函数:加法
*/
int add_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
if (args_count < 2 || params[0].type != RPC_TYPE_INT || params[1].type != RPC_TYPE_INT) {
return RPC_INVALID_ARGS;
}
int a = params[0].value.int_val;
int b = params[1].value.int_val;
return_value->value.int_val = a + b;
printf("Executed add(%d, %d) = %d\n", a, b, return_value->value.int_val);
return RPC_SUCCESS;
}
/*
* 示例函数:获取服务器信息
*/
int get_server_info_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
const char* info = "RPC Server v1.0";
return_value->value.string_val = strdup(info);
printf("Executed get_server_info()\n");
return RPC_SUCCESS;
}
/*
* 主函数
*/
int main(int argc, char* argv[]) {
rpc_server_t server;
int port = 8080;
#ifdef _WIN32
// 初始化Windows套接字库
if (rpc_winsock_init() != RPC_SUCCESS) {
printf("Failed to initialize Winsock\n");
return 1;
}
#endif
// 注册示例函数
rpc_param_type_t add_params[] = {RPC_TYPE_INT, RPC_TYPE_INT};
rpc_register_function("add", add_handler, RPC_TYPE_INT, add_params, 2);
rpc_register_function("get_server_info", get_server_info_handler, RPC_TYPE_STRING, NULL, 0);
// 初始化服务器
if (rpc_server_init(&server, "0.0.0.0", port, 5) != RPC_SUCCESS) {
printf("Failed to initialize server\n");
return 1;
}
printf("RPC Server listening on port %d\n", port);
printf("Waiting for client connections...\n");
while (1) {
// 接受客户端连接
rpc_transport_t* client_transport = (rpc_transport_t*)malloc(sizeof(rpc_transport_t));
if (!client_transport) {
printf("Failed to allocate memory\n");
continue;
}
if (rpc_server_accept(&server, client_transport) != RPC_SUCCESS) {
free(client_transport);
continue;
}
// 创建线程处理客户端请求
#ifdef _WIN32
HANDLE thread_handle = CreateThread(
NULL, // 默认安全属性
0, // 默认堆栈大小
handle_client, // 线程函数
client_transport, // 线程参数
0, // 默认创建标志
NULL // 线程ID(不需要)
);
if (thread_handle == NULL) {
fprintf(stderr, "CreateThread failed with error code: %d\n", GetLastError());
rpc_transport_close(client_transport);
free(client_transport);
continue;
}
// 关闭线程句柄但不终止线程,让线程自动结束
CloseHandle(thread_handle);
#else
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, handle_client, client_transport) != 0) {
perror("pthread_create failed");
rpc_transport_close(client_transport);
free(client_transport);
continue;
}
// 分离线程,自动回收资源
pthread_detach(thread_id);
#endif
}
// 关闭服务器
rpc_server_close(&server);
#ifdef _WIN32
// 清理Windows套接字库
rpc_winsock_cleanup();
#endif
return 0;
}