Browse Source

变更代码

master
xyiege 6 months ago
parent
commit
8e9ec43d1c
  1. 2
      scalib/include/rpc_transport.h
  2. 215
      scalib/src/rpc_common.c
  3. 570
      scalib/src/rpc_message.c
  4. 885
      scalib/src/rpc_server.c
  5. 678
      scalib/src/rpc_transport.c
  6. 40
      scalib/test_cmake_config.cmd
  7. 33
      scalib/test_compile.cmd

2
scalib/include/rpc_transport.h

@ -48,8 +48,8 @@ typedef struct {
typedef struct { typedef struct {
socket_t server_fd; // 服务器套接字文件描述符 socket_t server_fd; // 服务器套接字文件描述符
struct sockaddr_in address; // 服务器地址信息 struct sockaddr_in address; // 服务器地址信息
} rpc_server_t; } rpc_server_t;
/* 传输层函数声明 */ /* 传输层函数声明 */
// 初始化服务器 // 初始化服务器

215
scalib/src/rpc_common.c

@ -1,97 +1,120 @@
/* /*
* rpc_common.c * rpc_common.c
* RPC框架的公共工具函数 * RPC框架的公共工具函数
*/ */
#include "rpc_common.h" #include "rpc_common.h"
/* /*
* RPC参数 * RPC参数
*/ */
void rpc_init_param(rpc_param_t* param, rpc_param_type_t type) { void rpc_init_param(rpc_param_t* param, rpc_param_type_t type) {
if (!param) return; if (!param) return;
param->type = type;
param->type = type; switch (type) {
switch (type) { case RPC_TYPE_INT:
case RPC_TYPE_INT: param->value.int_val = 0;
param->value.int_val = 0; break;
break;
case RPC_TYPE_FLOAT: case RPC_TYPE_FLOAT:
param->value.float_val = 0.0f; param->value.float_val = 0.0f;
break; break;
case RPC_TYPE_DOUBLE:
param->value.double_val = 0.0; case RPC_TYPE_DOUBLE:
break; param->value.double_val = 0.0;
case RPC_TYPE_STRING: break;
param->value.string_val = NULL;
break; case RPC_TYPE_STRING:
case RPC_TYPE_BOOL: param->value.string_val = NULL;
param->value.bool_val = false; break;
break;
case RPC_TYPE_VOID: case RPC_TYPE_BOOL:
default: param->value.bool_val = false;
break; break;
}
} case RPC_TYPE_VOID:
break;
/*
* default:
*/ break;
void rpc_free_params(rpc_param_t* params, int count) {
if (!params || count <= 0) return; }
for (int i = 0; i < count; i++) { }
if (params[i].type == RPC_TYPE_STRING && params[i].value.string_val) {
free((void*)params[i].value.string_val);
params[i].value.string_val = NULL; /*
} *
} */
} void rpc_free_params(rpc_param_t* params, int count) {
if (!params || count <= 0) return;
/*
* for (int i = 0; i < count; i++) {
*/ if (params[i].type == RPC_TYPE_STRING && params[i].value.string_val) {
void rpc_free_request(rpc_request_t* request) { free((void*)params[i].value.string_val);
if (!request) return; params[i].value.string_val = NULL;
}
rpc_free_params(request->args, request->args_count); }
request->args_count = 0; }
request->func_name[0] = '\0';
} /*
*
/* */
* void rpc_free_request(rpc_request_t* request) {
*/ if (!request) return;
void rpc_free_response(rpc_response_t* response) { rpc_free_params(request->args, request->args_count);
if (!response) return; request->args_count = 0;
request->func_name[0] = '\0';
if (response->return_type == RPC_TYPE_STRING && response->return_value.string_val) { }
free((void*)response->return_value.string_val);
response->return_value.string_val = NULL;
}
/*
response->result_code = 0; *
response->return_type = RPC_TYPE_VOID; */
}
void rpc_free_response(rpc_response_t* response) {
/* if (!response) return;
*
*/ if (response->return_type == RPC_TYPE_STRING && response->return_value.string_val) {
const char* rpc_error_to_string(int error_code) { free((void*)response->return_value.string_val);
switch (error_code) { response->return_value.string_val = NULL;
case RPC_SUCCESS: }
return "Success";
case RPC_ERROR: response->result_code = 0;
return "General error"; response->return_type = RPC_TYPE_VOID;
case RPC_NET_ERROR:
return "Network error"; }
case RPC_TIMEOUT:
return "Timeout error";
case RPC_INVALID_ARGS:
return "Invalid arguments"; /*
case RPC_FUNC_NOT_FOUND: *
return "Function not found"; */
default:
return "Unknown error"; const char* rpc_error_to_string(int error_code) {
} switch (error_code) {
case RPC_SUCCESS:
return "Success";
case RPC_ERROR:
return "General error";
case RPC_NET_ERROR:
return "Network error";
case RPC_TIMEOUT:
return "Timeout error";
case RPC_INVALID_ARGS:
return "Invalid arguments";
case RPC_FUNC_NOT_FOUND:
return "Function not found";
default:
return "Unknown error";
}
} }

570
scalib/src/rpc_message.c

@ -1,191 +1,381 @@
/* /*
* rpc_message.c
* RPC消息的序列化和反序列化功能 * rpc_message.c
*/
* RPC消息的序列化和反序列化功能
#include "rpc_message.h"
#include "rpc_transport.h" */
/*
*
*/ #include "rpc_message.h"
int rpc_serialize_request(const rpc_request_t* request, void* buffer, size_t buffer_size) {
if (!request || !buffer || buffer_size < sizeof(rpc_request_t)) { #include "rpc_transport.h"
return RPC_INVALID_ARGS;
}
// 复制请求消息到缓冲区 /*
memcpy(buffer, request, sizeof(rpc_request_t));
*
// 注意:这里简化了字符串参数的序列化,实际上可能需要更复杂的处理
// 在实际应用中,可能需要计算字符串的总长度并动态分配足够的缓冲区 */
return RPC_SUCCESS; int rpc_serialize_request(const rpc_request_t* request, void* buffer, size_t buffer_size) {
}
if (!request || !buffer || buffer_size < sizeof(rpc_request_t)) {
/*
* return RPC_INVALID_ARGS;
*/
int rpc_deserialize_request(const void* buffer, size_t buffer_size, rpc_request_t* request) { }
if (!buffer || !request || buffer_size < sizeof(rpc_request_t)) {
return RPC_INVALID_ARGS;
}
// 复制请求消息到缓冲区
// 从缓冲区复制请求消息
memcpy(request, buffer, sizeof(rpc_request_t)); memcpy(buffer, request, sizeof(rpc_request_t));
// 注意:这里简化了字符串参数的反序列化
// 在实际应用中,可能需要处理字符串的动态分配和复制
// 注意:这里简化了字符串参数的序列化,实际上可能需要更复杂的处理
return RPC_SUCCESS;
} // 在实际应用中,可能需要计算字符串的总长度并动态分配足够的缓冲区
/*
*
*/ return RPC_SUCCESS;
int rpc_serialize_response(const rpc_response_t* response, void* buffer, size_t buffer_size) {
if (!response || !buffer || buffer_size < sizeof(rpc_response_t)) { }
return RPC_INVALID_ARGS;
}
// 复制响应消息到缓冲区 /*
memcpy(buffer, response, sizeof(rpc_response_t));
*
// 注意:这里简化了字符串返回值的序列化
*/
return RPC_SUCCESS;
} int rpc_deserialize_request(const void* buffer, size_t buffer_size, rpc_request_t* request) {
/* if (!buffer || !request || buffer_size < sizeof(rpc_request_t)) {
*
*/ return RPC_INVALID_ARGS;
int rpc_deserialize_response(const void* buffer, size_t buffer_size, rpc_response_t* response) {
if (!buffer || !response || buffer_size < sizeof(rpc_response_t)) { }
return RPC_INVALID_ARGS;
}
// 从缓冲区复制响应消息 // 从缓冲区复制请求消息
memcpy(response, buffer, sizeof(rpc_response_t));
memcpy(request, buffer, sizeof(rpc_request_t));
// 注意:这里简化了字符串返回值的反序列化
return RPC_SUCCESS;
} // 注意:这里简化了字符串参数的反序列化
/* // 在实际应用中,可能需要处理字符串的动态分配和复制
*
*/
int rpc_send_request(rpc_transport_t* transport, const rpc_request_t* request) {
if (!transport || !request) { return RPC_SUCCESS;
return RPC_INVALID_ARGS;
} }
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_t message;
/*
// 设置消息头部
message.header.type = RPC_MESSAGE_REQUEST; *
message.header.payload_size = sizeof(rpc_request_t);
*/
// 复制请求到消息负载
message.payload.request = *request; int rpc_serialize_response(const rpc_response_t* response, void* buffer, size_t buffer_size) {
// 序列化整个消息 if (!response || !buffer || buffer_size < sizeof(rpc_response_t)) {
size_t message_size = sizeof(rpc_message_header_t) + message.header.payload_size;
if (message_size > MAX_MESSAGE_SIZE) { return RPC_INVALID_ARGS;
return RPC_ERROR;
} }
memcpy(buffer, &message, message_size);
// 发送消息 // 复制响应消息到缓冲区
return rpc_transport_send(transport, buffer, message_size);
} memcpy(buffer, response, sizeof(rpc_response_t));
/*
*
*/ // 注意:这里简化了字符串返回值的序列化
int rpc_recv_request(rpc_transport_t* transport, rpc_request_t* request) {
if (!transport || !request) {
return RPC_INVALID_ARGS;
} return RPC_SUCCESS;
uint8_t buffer[MAX_MESSAGE_SIZE]; }
rpc_message_header_t header;
// 接收消息头部
if (rpc_transport_recv(transport, &header, sizeof(rpc_message_header_t)) != RPC_SUCCESS) { /*
return RPC_NET_ERROR;
} *
// 检查消息类型和大小 */
if (header.type != RPC_MESSAGE_REQUEST || header.payload_size > MAX_MESSAGE_SIZE - sizeof(rpc_message_header_t)) {
return RPC_ERROR; int rpc_deserialize_response(const void* buffer, size_t buffer_size, rpc_response_t* response) {
}
if (!buffer || !response || buffer_size < sizeof(rpc_response_t)) {
// 接收消息负载
if (rpc_transport_recv(transport, request, header.payload_size) != RPC_SUCCESS) { return RPC_INVALID_ARGS;
return RPC_NET_ERROR;
} }
return RPC_SUCCESS;
}
// 从缓冲区复制响应消息
/*
* memcpy(response, buffer, sizeof(rpc_response_t));
*/
int rpc_send_response(rpc_transport_t* transport, const rpc_response_t* response) {
if (!transport || !response) {
return RPC_INVALID_ARGS; // 注意:这里简化了字符串返回值的反序列化
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_t message; return RPC_SUCCESS;
// 设置消息头部 }
message.header.type = RPC_MESSAGE_RESPONSE;
message.header.payload_size = sizeof(rpc_response_t);
// 复制响应到消息负载 /*
message.payload.response = *response;
*
// 序列化整个消息
size_t message_size = sizeof(rpc_message_header_t) + message.header.payload_size; */
if (message_size > MAX_MESSAGE_SIZE) {
return RPC_ERROR; int rpc_send_request(rpc_transport_t* transport, const rpc_request_t* request) {
}
if (!transport || !request) {
memcpy(buffer, &message, message_size);
return RPC_INVALID_ARGS;
// 发送消息
return rpc_transport_send(transport, buffer, message_size); }
}
/*
* uint8_t buffer[MAX_MESSAGE_SIZE];
*/
int rpc_recv_response(rpc_transport_t* transport, rpc_response_t* response) { rpc_message_t message;
if (!transport || !response) {
return RPC_INVALID_ARGS;
}
// 设置消息头部
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_header_t header; message.header.type = RPC_MESSAGE_REQUEST;
// 接收消息头部 message.header.payload_size = sizeof(rpc_request_t);
if (rpc_transport_recv(transport, &header, sizeof(rpc_message_header_t)) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
// 复制请求到消息负载
// 检查消息类型和大小
if (header.type != RPC_MESSAGE_RESPONSE || header.payload_size > MAX_MESSAGE_SIZE - sizeof(rpc_message_header_t)) { message.payload.request = *request;
return RPC_ERROR;
}
// 接收消息负载 // 序列化整个消息
if (rpc_transport_recv(transport, response, header.payload_size) != RPC_SUCCESS) {
return RPC_NET_ERROR; size_t message_size = sizeof(rpc_message_header_t) + message.header.payload_size;
}
if (message_size > MAX_MESSAGE_SIZE) {
return RPC_SUCCESS;
return RPC_ERROR;
}
memcpy(buffer, &message, message_size);
// 发送消息
return rpc_transport_send(transport, buffer, message_size);
}
/*
*
*/
int rpc_recv_request(rpc_transport_t* transport, rpc_request_t* request) {
if (!transport || !request) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_header_t header;
// 接收消息头部
if (rpc_transport_recv(transport, &header, sizeof(rpc_message_header_t)) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
// 检查消息类型和大小
if (header.type != RPC_MESSAGE_REQUEST || header.payload_size > MAX_MESSAGE_SIZE - sizeof(rpc_message_header_t)) {
return RPC_ERROR;
}
// 接收消息负载
if (rpc_transport_recv(transport, request, header.payload_size) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
return RPC_SUCCESS;
}
/*
*
*/
int rpc_send_response(rpc_transport_t* transport, const rpc_response_t* response) {
if (!transport || !response) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_t message;
// 设置消息头部
message.header.type = RPC_MESSAGE_RESPONSE;
message.header.payload_size = sizeof(rpc_response_t);
// 复制响应到消息负载
message.payload.response = *response;
// 序列化整个消息
size_t message_size = sizeof(rpc_message_header_t) + message.header.payload_size;
if (message_size > MAX_MESSAGE_SIZE) {
return RPC_ERROR;
}
memcpy(buffer, &message, message_size);
// 发送消息
return rpc_transport_send(transport, buffer, message_size);
}
/*
*
*/
int rpc_recv_response(rpc_transport_t* transport, rpc_response_t* response) {
if (!transport || !response) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_header_t header;
// 接收消息头部
if (rpc_transport_recv(transport, &header, sizeof(rpc_message_header_t)) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
// 检查消息类型和大小
if (header.type != RPC_MESSAGE_RESPONSE || header.payload_size > MAX_MESSAGE_SIZE - sizeof(rpc_message_header_t)) {
return RPC_ERROR;
}
// 接收消息负载
if (rpc_transport_recv(transport, response, header.payload_size) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
return RPC_SUCCESS;
} }

885
scalib/src/rpc_server.c

@ -1,296 +1,591 @@
/* /*
* rpc_server.c
* RPC服务器实现 * rpc_server.c
*/
* RPC服务器实现
#include "rpc_common.h"
#include "rpc_transport.h" */
#include "rpc_message.h"
#ifdef _WIN32
#include <windows.h> #include "rpc_common.h"
#else
#include <pthread.h> #include "rpc_transport.h"
#endif
#include "rpc_message.h"
/* 函数处理函数类型定义 */
typedef int (*rpc_handler_func_t)(rpc_param_t* params, int args_count, rpc_param_t* return_value);
/* 函数处理器结构 */ #ifdef _WIN32
typedef struct {
char func_name[MAX_FUNC_NAME_LEN]; #include <windows.h>
rpc_handler_func_t handler;
rpc_param_type_t return_type; #else
rpc_param_type_t param_types[MAX_ARGS_COUNT];
int param_count; #include <pthread.h>
} rpc_handler_t;
#endif
/* 函数处理器表 */
static rpc_handler_t handlers[MAX_FUNC_NAME_LEN];
static int handler_count = 0;
/* 函数处理函数类型定义 */
/*
* typedef int (*rpc_handler_func_t)(rpc_param_t* params, int args_count, rpc_param_t* return_value);
*/
int rpc_register_function(const char* func_name, rpc_handler_func_t handler,
rpc_param_type_t return_type,
rpc_param_type_t* param_types, int param_count) { /* 函数处理器结构 */
if (!func_name || !handler || param_count < 0 || param_count > MAX_ARGS_COUNT) {
return RPC_INVALID_ARGS; typedef struct {
}
char func_name[MAX_FUNC_NAME_LEN];
if (handler_count >= MAX_FUNC_NAME_LEN) {
return RPC_ERROR; rpc_handler_func_t handler;
}
rpc_param_type_t return_type;
// 检查函数名是否已存在
for (int i = 0; i < handler_count; i++) { rpc_param_type_t param_types[MAX_ARGS_COUNT];
if (strcmp(handlers[i].func_name, func_name) == 0) {
return RPC_ERROR; int param_count;
}
} } rpc_handler_t;
// 添加新的函数处理器
strncpy(handlers[handler_count].func_name, func_name, MAX_FUNC_NAME_LEN - 1);
handlers[handler_count].func_name[MAX_FUNC_NAME_LEN - 1] = '\0'; /* 函数处理器表 */
handlers[handler_count].handler = handler;
handlers[handler_count].return_type = return_type; static rpc_handler_t handlers[MAX_FUNC_NAME_LEN];
handlers[handler_count].param_count = param_count;
static int handler_count = 0;
// 复制参数类型
if (param_count > 0 && param_types) {
memcpy(handlers[handler_count].param_types, param_types,
sizeof(rpc_param_type_t) * param_count); /*
}
*
handler_count++;
printf("Registered function: %s\n", func_name); */
return RPC_SUCCESS;
} int rpc_register_function(const char* func_name, rpc_handler_func_t handler,
/* rpc_param_type_t return_type,
*
*/ rpc_param_type_t* param_types, int param_count) {
rpc_handler_t* rpc_find_handler(const char* func_name) {
for (int i = 0; i < handler_count; i++) { if (!func_name || !handler || param_count < 0 || param_count > MAX_ARGS_COUNT) {
if (strcmp(handlers[i].func_name, func_name) == 0) {
return &handlers[i]; return RPC_INVALID_ARGS;
}
} }
return NULL;
}
/* if (handler_count >= MAX_FUNC_NAME_LEN) {
*
*/ return RPC_ERROR;
#ifdef _WIN32
DWORD WINAPI handle_client(LPVOID arg) { }
#else
void* handle_client(void* arg) {
#endif
rpc_transport_t* transport = (rpc_transport_t*)arg; // 检查函数名是否已存在
rpc_request_t request;
rpc_response_t response; for (int i = 0; i < handler_count; i++) {
int ret;
if (strcmp(handlers[i].func_name, func_name) == 0) {
while (1) {
// 接收请求 return RPC_ERROR;
ret = rpc_recv_request(transport, &request);
if (ret != RPC_SUCCESS) { }
printf("Failed to receive request: %s\n", rpc_error_to_string(ret));
break; }
}
printf("Received request: %s with %d arguments\n", request.func_name, request.args_count);
// 添加新的函数处理器
// 查找函数处理器
rpc_handler_t* handler = rpc_find_handler(request.func_name); strncpy(handlers[handler_count].func_name, func_name, MAX_FUNC_NAME_LEN - 1);
if (!handler) {
printf("Function not found: %s\n", request.func_name); handlers[handler_count].func_name[MAX_FUNC_NAME_LEN - 1] = '\0';
response.result_code = RPC_FUNC_NOT_FOUND;
response.return_type = RPC_TYPE_VOID; handlers[handler_count].handler = handler;
rpc_send_response(transport, &response);
rpc_free_request(&request); handlers[handler_count].return_type = return_type;
continue;
} handlers[handler_count].param_count = param_count;
// 检查参数数量
if (request.args_count != handler->param_count) {
printf("Invalid argument count for %s\n", request.func_name); // 复制参数类型
response.result_code = RPC_INVALID_ARGS;
response.return_type = RPC_TYPE_VOID; if (param_count > 0 && param_types) {
rpc_send_response(transport, &response);
rpc_free_request(&request); memcpy(handlers[handler_count].param_types, param_types,
continue;
} sizeof(rpc_param_type_t) * param_count);
// 执行函数 }
rpc_param_t return_value;
rpc_init_param(&return_value, handler->return_type);
ret = handler->handler(request.args, request.args_count, &return_value); handler_count++;
// 设置响应 printf("Registered function: %s\n", func_name);
response.result_code = ret;
response.return_type = handler->return_type; return RPC_SUCCESS;
// 复制返回值 }
switch (handler->return_type) {
case RPC_TYPE_INT:
response.return_value.int_val = return_value.value.int_val;
break; /*
case RPC_TYPE_FLOAT:
response.return_value.float_val = return_value.value.float_val; *
break;
case RPC_TYPE_DOUBLE: */
response.return_value.double_val = return_value.value.double_val;
break; rpc_handler_t* rpc_find_handler(const char* func_name) {
case RPC_TYPE_STRING:
// 注意:这里需要复制字符串,避免内存问题 for (int i = 0; i < handler_count; i++) {
response.return_value.string_val = strdup(return_value.value.string_val);
break; if (strcmp(handlers[i].func_name, func_name) == 0) {
case RPC_TYPE_BOOL:
response.return_value.bool_val = return_value.value.bool_val; return &handlers[i];
break;
case RPC_TYPE_VOID: }
default:
break; }
}
return NULL;
// 发送响应
rpc_send_response(transport, &response); }
// 释放资源
rpc_free_request(&request);
if (handler->return_type == RPC_TYPE_STRING && return_value.value.string_val) { /*
free((void*)return_value.value.string_val);
} *
if (response.return_type == RPC_TYPE_STRING && response.return_value.string_val) {
free((void*)response.return_value.string_val); */
}
} #ifdef _WIN32
// 关闭连接 DWORD WINAPI handle_client(LPVOID arg) {
rpc_transport_close(transport);
free(transport); #else
#ifdef _WIN32 void* handle_client(void* arg) {
return 0;
#else #endif
return NULL;
#endif rpc_transport_t* transport = (rpc_transport_t*)arg;
}
rpc_request_t request;
/*
* rpc_response_t response;
*/
int add_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) { int ret;
if (args_count < 2 || params[0].type != RPC_TYPE_INT || params[1].type != RPC_TYPE_INT) {
return RPC_INVALID_ARGS;
}
while (1) {
int a = params[0].value.int_val;
int b = params[1].value.int_val; // 接收请求
return_value->value.int_val = a + b; ret = rpc_recv_request(transport, &request);
printf("Executed add(%d, %d) = %d\n", a, b, return_value->value.int_val); if (ret != RPC_SUCCESS) {
return RPC_SUCCESS;
} printf("Failed to receive request: %s\n", rpc_error_to_string(ret));
/* break;
*
*/ }
int get_server_info_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
const char* info = "RPC Server v1.0";
return_value->value.string_val = strdup(info);
printf("Received request: %s with %d arguments\n", request.func_name, request.args_count);
printf("Executed get_server_info()\n");
return RPC_SUCCESS;
}
// 查找函数处理器
/*
* rpc_handler_t* handler = rpc_find_handler(request.func_name);
*/
int main(int argc, char* argv[]) { if (!handler) {
rpc_server_t server;
int port = 8080; printf("Function not found: %s\n", request.func_name);
#ifdef _WIN32 response.result_code = RPC_FUNC_NOT_FOUND;
// 初始化Windows套接字库
if (rpc_winsock_init() != RPC_SUCCESS) { response.return_type = RPC_TYPE_VOID;
printf("Failed to initialize Winsock\n");
return 1; rpc_send_response(transport, &response);
}
#endif rpc_free_request(&request);
// 注册示例函数 continue;
rpc_param_type_t add_params[] = {RPC_TYPE_INT, RPC_TYPE_INT};
rpc_register_function("add", add_handler, RPC_TYPE_INT, add_params, 2); }
rpc_register_function("get_server_info", get_server_info_handler, RPC_TYPE_STRING, NULL, 0);
// 初始化服务器
if (rpc_server_init(&server, "0.0.0.0", port, 5) != RPC_SUCCESS) { // 检查参数数量
printf("Failed to initialize server\n");
return 1; if (request.args_count != handler->param_count) {
}
printf("Invalid argument count for %s\n", request.func_name);
printf("RPC Server listening on port %d\n", port);
printf("Waiting for client connections...\n"); response.result_code = RPC_INVALID_ARGS;
while (1) { response.return_type = RPC_TYPE_VOID;
// 接受客户端连接
rpc_transport_t* client_transport = (rpc_transport_t*)malloc(sizeof(rpc_transport_t)); rpc_send_response(transport, &response);
if (!client_transport) {
printf("Failed to allocate memory\n"); rpc_free_request(&request);
continue;
} continue;
if (rpc_server_accept(&server, client_transport) != RPC_SUCCESS) { }
free(client_transport);
continue;
}
// 执行函数
// 创建线程处理客户端请求
#ifdef _WIN32 rpc_param_t return_value;
HANDLE thread_handle = CreateThread(
NULL, // 默认安全属性 rpc_init_param(&return_value, handler->return_type);
0, // 默认堆栈大小
handle_client, // 线程函数
client_transport, // 线程参数
0, // 默认创建标志 ret = handler->handler(request.args, request.args_count, &return_value);
NULL // 线程ID(不需要)
);
if (thread_handle == NULL) {
fprintf(stderr, "CreateThread failed with error code: %d\n", GetLastError()); // 设置响应
rpc_transport_close(client_transport);
free(client_transport); response.result_code = ret;
continue;
} response.return_type = handler->return_type;
// 关闭线程句柄但不终止线程,让线程自动结束
CloseHandle(thread_handle);
#else // 复制返回值
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, handle_client, client_transport) != 0) { switch (handler->return_type) {
perror("pthread_create failed");
rpc_transport_close(client_transport); case RPC_TYPE_INT:
free(client_transport);
continue; response.return_value.int_val = return_value.value.int_val;
}
break;
// 分离线程,自动回收资源
pthread_detach(thread_id); case RPC_TYPE_FLOAT:
#endif
} response.return_value.float_val = return_value.value.float_val;
// 关闭服务器 break;
rpc_server_close(&server);
case RPC_TYPE_DOUBLE:
#ifdef _WIN32
// 清理Windows套接字库 response.return_value.double_val = return_value.value.double_val;
rpc_winsock_cleanup();
#endif break;
return 0; case RPC_TYPE_STRING:
// 注意:这里需要复制字符串,避免内存问题
response.return_value.string_val = strdup(return_value.value.string_val);
break;
case RPC_TYPE_BOOL:
response.return_value.bool_val = return_value.value.bool_val;
break;
case RPC_TYPE_VOID:
default:
break;
}
// 发送响应
rpc_send_response(transport, &response);
// 释放资源
rpc_free_request(&request);
if (handler->return_type == RPC_TYPE_STRING && return_value.value.string_val) {
free((void*)return_value.value.string_val);
}
if (response.return_type == RPC_TYPE_STRING && response.return_value.string_val) {
free((void*)response.return_value.string_val);
}
}
// 关闭连接
rpc_transport_close(transport);
free(transport);
#ifdef _WIN32
return 0;
#else
return NULL;
#endif
}
/*
*
*/
int add_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
if (args_count < 2 || params[0].type != RPC_TYPE_INT || params[1].type != RPC_TYPE_INT) {
return RPC_INVALID_ARGS;
}
int a = params[0].value.int_val;
int b = params[1].value.int_val;
return_value->value.int_val = a + b;
printf("Executed add(%d, %d) = %d\n", a, b, return_value->value.int_val);
return RPC_SUCCESS;
}
/*
*
*/
int get_server_info_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
const char* info = "RPC Server v1.0";
return_value->value.string_val = strdup(info);
printf("Executed get_server_info()\n");
return RPC_SUCCESS;
}
/*
*
*/
int main(int argc, char* argv[]) {
rpc_server_t server;
int port = 8080;
#ifdef _WIN32
// 初始化Windows套接字库
if (rpc_winsock_init() != RPC_SUCCESS) {
printf("Failed to initialize Winsock\n");
return 1;
}
#endif
// 注册示例函数
rpc_param_type_t add_params[] = {RPC_TYPE_INT, RPC_TYPE_INT};
rpc_register_function("add", add_handler, RPC_TYPE_INT, add_params, 2);
rpc_register_function("get_server_info", get_server_info_handler, RPC_TYPE_STRING, NULL, 0);
// 初始化服务器
if (rpc_server_init(&server, "0.0.0.0", port, 5) != RPC_SUCCESS) {
printf("Failed to initialize server\n");
return 1;
}
printf("RPC Server listening on port %d\n", port);
printf("Waiting for client connections...\n");
while (1) {
// 接受客户端连接
rpc_transport_t* client_transport = (rpc_transport_t*)malloc(sizeof(rpc_transport_t));
if (!client_transport) {
printf("Failed to allocate memory\n");
continue;
}
if (rpc_server_accept(&server, client_transport) != RPC_SUCCESS) {
free(client_transport);
continue;
}
// 创建线程处理客户端请求
#ifdef _WIN32
HANDLE thread_handle = CreateThread(
NULL, // 默认安全属性
0, // 默认堆栈大小
handle_client, // 线程函数
client_transport, // 线程参数
0, // 默认创建标志
NULL // 线程ID(不需要)
);
if (thread_handle == NULL) {
fprintf(stderr, "CreateThread failed with error code: %d\n", GetLastError());
rpc_transport_close(client_transport);
free(client_transport);
continue;
}
// 关闭线程句柄但不终止线程,让线程自动结束
CloseHandle(thread_handle);
#else
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, handle_client, client_transport) != 0) {
perror("pthread_create failed");
rpc_transport_close(client_transport);
free(client_transport);
continue;
}
// 分离线程,自动回收资源
pthread_detach(thread_id);
#endif
}
// 关闭服务器
rpc_server_close(&server);
#ifdef _WIN32
// 清理Windows套接字库
rpc_winsock_cleanup();
#endif
return 0;
} }

678
scalib/src/rpc_transport.c

@ -1,259 +1,421 @@
/* /*
* rpc_transport.c * rpc_transport.c
* RPC传输层的功能 * RPC传输层的功能
*/ */
#include "rpc_transport.h" #include "rpc_transport.h"
#ifdef _WIN32 // 调试测试代码
/* void debug_test_struct_definition() {
* Windows套接字库 rpc_server_t server;
*/ server.address.sin_family = AF_INET; // 测试访问address成员
int rpc_winsock_init() { }
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData); #ifdef _WIN32
if (result != 0) {
fprintf(stderr, "WSAStartup failed: %d\n", result); /*
return RPC_NET_ERROR; * Windows套接字库
} */
return RPC_SUCCESS; int rpc_winsock_init() {
} WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
/* if (result != 0) {
* Windows套接字库 fprintf(stderr, "WSAStartup failed: %d\n", result);
*/ return RPC_NET_ERROR;
void rpc_winsock_cleanup() { }
WSACleanup(); return RPC_SUCCESS;
} }
/* /*
* Windows平台的错误打印函数 * Windows套接字库
*/ */
static void print_windows_error(const char* message) { void rpc_winsock_cleanup() {
int error_code = WSAGetLastError(); WSACleanup();
char* error_text = NULL; }
FormatMessageA(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, #endif
NULL, error_code, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPSTR)&error_text, 0, NULL); /*
if (error_text) { * Windows平台的错误打印函数
fprintf(stderr, "%s: %s\n", message, error_text); */
LocalFree(error_text); static void print_windows_error(const char* message) {
} else { int error_code = WSAGetLastError();
fprintf(stderr, "%s: Error code %d\n", message, error_code); char* error_text = NULL;
} FormatMessageA(
} FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, error_code, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
#define PRINT_ERROR(msg) print_windows_error(msg) (LPSTR)&error_text, 0, NULL);
#else if (error_text) {
/* fprintf(stderr, "%s: %s\n", message, error_text);
* Linux/Unix平台的错误打印函数 LocalFree(error_text);
*/ } else {
#define PRINT_ERROR(msg) perror(msg) fprintf(stderr, "%s: Error code %d\n", message, error_code);
#endif }
}
/*
* RPC服务器 #define PRINT_ERROR(msg) print_windows_error(msg)
*/ #else
int rpc_server_init(rpc_server_t* server, const char* host, uint16_t port, int backlog) { /*
if (!server || !host) { * Linux/Unix平台的错误打印函数
return RPC_INVALID_ARGS; */
} #define PRINT_ERROR(msg) perror(msg)
#endif
// 创建套接字
server->server_fd = socket(AF_INET, SOCK_STREAM, 0); /*
if (server->server_fd == INVALID_SOCKET_VALUE) { * RPC服务器
PRINT_ERROR("socket creation failed"); */
return RPC_NET_ERROR; int rpc_server_init(rpc_server_t* server, const char* host, uint16_t port, int backlog) {
} // 初始化Windows套接字库(如果需要)
if (rpc_winsock_init() != RPC_SUCCESS) {
// 设置套接字选项,允许地址重用 return RPC_NET_ERROR;
int opt = 1; }
#ifdef _WIN32
// Windows不支持SO_REUSEPORT // 创建套接字
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR, server->server_fd = socket(AF_INET, SOCK_STREAM, 0);
&opt, sizeof(opt)) != 0) { if (server->server_fd == INVALID_SOCKET_VALUE) {
#else PRINT_ERROR("socket creation failed");
// Linux/Unix支持SO_REUSEPORT rpc_winsock_cleanup();
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, return RPC_NET_ERROR;
&opt, sizeof(opt)) != 0) { }
#endif
PRINT_ERROR("setsockopt failed"); // 设置套接字选项,允许地址重用
CLOSE_SOCKET(server->server_fd); int opt = 1;
return RPC_NET_ERROR; #ifdef _WIN32
} // Windows不支持SO_REUSEPORT
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR,
// 设置服务器地址结构 &opt, sizeof(opt)) != 0) {
memset(&server->address, 0, sizeof(server->address)); #else
server->address.sin_family = AF_INET; // Linux/Unix支持SO_REUSEPORT
server->address.sin_addr.s_addr = inet_addr(host); if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
server->address.sin_port = htons(port); &opt, sizeof(opt)) != 0) {
#endif
// 绑定地址到套接字 PRINT_ERROR("setsockopt failed");
if (bind(server->server_fd, (struct sockaddr*)&server->address, sizeof(server->address)) != 0) { CLOSE_SOCKET(server->server_fd);
PRINT_ERROR("bind failed"); return RPC_NET_ERROR;
CLOSE_SOCKET(server->server_fd); }
return RPC_NET_ERROR;
} // 设置服务器地址结构
memset(&server->address, 0, sizeof(server->address));
// 开始监听连接 server->address.sin_family = AF_INET;
if (listen(server->server_fd, backlog) != 0) { server->address.sin_addr.s_addr = inet_addr(host);
PRINT_ERROR("listen failed"); server->address.sin_port = htons(port);
CLOSE_SOCKET(server->server_fd);
return RPC_NET_ERROR; // 绑定地址到套接字
} if (bind(server->server_fd, (struct sockaddr*)&server->address, sizeof(server->address)) != 0) {
PRINT_ERROR("bind failed");
printf("RPC Server started on %s:%d\n", host, port); CLOSE_SOCKET(server->server_fd);
return RPC_SUCCESS; return RPC_NET_ERROR;
} }
/* // 开始监听连接
* if (listen(server->server_fd, backlog) != 0) {
*/ PRINT_ERROR("listen failed");
int rpc_server_accept(rpc_server_t* server, rpc_transport_t* transport) { CLOSE_SOCKET(server->server_fd);
if (!server || !transport) { return RPC_NET_ERROR;
return RPC_INVALID_ARGS; }
}
printf("RPC Server started on %s:%d\n", host, port);
socklen_t addrlen = sizeof(transport->address); return RPC_SUCCESS;
transport->socket_fd = accept(server->server_fd, (struct sockaddr*)&transport->address, &addrlen); }
if (transport->socket_fd == INVALID_SOCKET_VALUE) {
PRINT_ERROR("accept failed");
return RPC_NET_ERROR;
} /*
printf("Client connected: %s:%d\n", *
inet_ntoa(transport->address.sin_addr),
ntohs(transport->address.sin_port)); */
return RPC_SUCCESS;
} int rpc_server_accept(rpc_server_t* server, rpc_transport_t* transport) {
/* if (!server || !transport) {
* RPC服务器
*/ return RPC_INVALID_ARGS;
void rpc_server_close(rpc_server_t* server) {
if (!server || server->server_fd == INVALID_SOCKET_VALUE) { }
return;
}
CLOSE_SOCKET(server->server_fd); socklen_t addrlen = sizeof(transport->address);
server->server_fd = INVALID_SOCKET_VALUE;
printf("RPC Server closed\n"); transport->socket_fd = accept(server->server_fd, (struct sockaddr*)&transport->address, &addrlen);
}
if (transport->socket_fd == INVALID_SOCKET_VALUE) {
/*
* RPC客户端传输 PRINT_ERROR("accept failed");
*/
int rpc_client_init(rpc_transport_t* transport, const char* server_host, uint16_t server_port) { return RPC_NET_ERROR;
if (!transport || !server_host) {
return RPC_INVALID_ARGS; }
}
// 创建套接字
transport->socket_fd = socket(AF_INET, SOCK_STREAM, 0); printf("Client connected: %s:%d\n",
if (transport->socket_fd == INVALID_SOCKET_VALUE) {
PRINT_ERROR("socket creation failed"); inet_ntoa(transport->address.sin_addr),
return RPC_NET_ERROR;
} ntohs(transport->address.sin_port));
// 设置服务器地址结构 return RPC_SUCCESS;
memset(&transport->address, 0, sizeof(transport->address));
transport->address.sin_family = AF_INET; }
transport->address.sin_port = htons(server_port);
// 将主机名转换为IP地址
if (inet_pton(AF_INET, server_host, &transport->address.sin_addr) <= 0) { /*
PRINT_ERROR("invalid address");
CLOSE_SOCKET(transport->socket_fd); * RPC服务器
return RPC_NET_ERROR;
} */
// 连接到服务器 void rpc_server_close(rpc_server_t* server) {
if (connect(transport->socket_fd, (struct sockaddr*)&transport->address, sizeof(transport->address)) != 0) {
PRINT_ERROR("connection failed"); if (!server || server->server_fd == INVALID_SOCKET_VALUE) {
CLOSE_SOCKET(transport->socket_fd);
return RPC_NET_ERROR; return;
}
}
printf("Connected to server %s:%d\n", server_host, server_port);
return RPC_SUCCESS;
}
CLOSE_SOCKET(server->server_fd);
/*
* server->server_fd = INVALID_SOCKET_VALUE;
*/
int rpc_transport_send(rpc_transport_t* transport, const void* data, size_t data_size) { printf("RPC Server closed\n");
if (!transport || !data || data_size == 0) {
return RPC_INVALID_ARGS; }
}
// 发送所有数据
size_t sent_bytes = 0; /*
while (sent_bytes < data_size) {
#ifdef _WIN32 * RPC客户端传输
int bytes = send(transport->socket_fd, (const char*)data + sent_bytes, (
#ifdef _WIN64 */
int
#else int rpc_client_init(rpc_transport_t* transport, const char* server_host, uint16_t server_port) {
int
#endif if (!transport || !server_host) {
)(data_size - sent_bytes), 0);
#else return RPC_INVALID_ARGS;
ssize_t bytes = send(transport->socket_fd, (const char*)data + sent_bytes, data_size - sent_bytes, 0);
#endif }
if (bytes <= 0) {
PRINT_ERROR("send failed");
return RPC_NET_ERROR;
} // 创建套接字
sent_bytes += bytes;
} transport->socket_fd = socket(AF_INET, SOCK_STREAM, 0);
return RPC_SUCCESS; if (transport->socket_fd == INVALID_SOCKET_VALUE) {
}
PRINT_ERROR("socket creation failed");
/*
* return RPC_NET_ERROR;
*/
int rpc_transport_recv(rpc_transport_t* transport, void* buffer, size_t buffer_size) { }
if (!transport || !buffer || buffer_size == 0) {
return RPC_INVALID_ARGS;
}
// 设置服务器地址结构
// 接收所有数据
size_t recv_bytes = 0; memset(&transport->address, 0, sizeof(transport->address));
while (recv_bytes < buffer_size) {
#ifdef _WIN32 transport->address.sin_family = AF_INET;
int bytes = recv(transport->socket_fd, (char*)buffer + recv_bytes, (
#ifdef _WIN64 transport->address.sin_port = htons(server_port);
int
#else
int
#endif // 将主机名转换为IP地址
)(buffer_size - recv_bytes), 0);
#else if (inet_pton(AF_INET, server_host, &transport->address.sin_addr) <= 0) {
ssize_t bytes = recv(transport->socket_fd, (char*)buffer + recv_bytes, buffer_size - recv_bytes, 0);
#endif PRINT_ERROR("invalid address");
if (bytes < 0) {
PRINT_ERROR("recv failed"); CLOSE_SOCKET(transport->socket_fd);
return RPC_NET_ERROR;
} else if (bytes == 0) { return RPC_NET_ERROR;
// 连接关闭
return RPC_NET_ERROR; }
}
recv_bytes += bytes;
}
// 连接到服务器
return RPC_SUCCESS;
} if (connect(transport->socket_fd, (struct sockaddr*)&transport->address, sizeof(transport->address)) != 0) {
/* PRINT_ERROR("connection failed");
*
*/ CLOSE_SOCKET(transport->socket_fd);
void rpc_transport_close(rpc_transport_t* transport) {
if (!transport || transport->socket_fd == INVALID_SOCKET_VALUE) { return RPC_NET_ERROR;
return;
} }
CLOSE_SOCKET(transport->socket_fd);
transport->socket_fd = INVALID_SOCKET_VALUE;
printf("Connected to server %s:%d\n", server_host, server_port);
return RPC_SUCCESS;
}
/*
*
*/
int rpc_transport_send(rpc_transport_t* transport, const void* data, size_t data_size) {
if (!transport || !data || data_size == 0) {
return RPC_INVALID_ARGS;
}
// 发送所有数据
size_t sent_bytes = 0;
while (sent_bytes < data_size) {
#ifdef _WIN32
int bytes = send(transport->socket_fd, (const char*)data + sent_bytes, (
#ifdef _WIN64
int
#else
int
#endif
)(data_size - sent_bytes), 0);
#else
ssize_t bytes = send(transport->socket_fd, (const char*)data + sent_bytes, data_size - sent_bytes, 0);
#endif
if (bytes <= 0) {
PRINT_ERROR("send failed");
return RPC_NET_ERROR;
}
sent_bytes += bytes;
}
return RPC_SUCCESS;
}
/*
*
*/
int rpc_transport_recv(rpc_transport_t* transport, void* buffer, size_t buffer_size) {
if (!transport || !buffer || buffer_size == 0) {
return RPC_INVALID_ARGS;
}
// 接收所有数据
size_t recv_bytes = 0;
while (recv_bytes < buffer_size) {
#ifdef _WIN32
int bytes = recv(transport->socket_fd, (char*)buffer + recv_bytes, (
#ifdef _WIN64
int
#else
int
#endif
)(buffer_size - recv_bytes), 0);
#else
ssize_t bytes = recv(transport->socket_fd, (char*)buffer + recv_bytes, buffer_size - recv_bytes, 0);
#endif
if (bytes < 0) {
PRINT_ERROR("recv failed");
return RPC_NET_ERROR;
} else if (bytes == 0) {
// 连接关闭
return RPC_NET_ERROR;
}
recv_bytes += bytes;
}
return RPC_SUCCESS;
}
/*
*
*/
void rpc_transport_close(rpc_transport_t* transport) {
if (!transport || transport->socket_fd == INVALID_SOCKET_VALUE) {
return;
}
CLOSE_SOCKET(transport->socket_fd);
transport->socket_fd = INVALID_SOCKET_VALUE;
} }

40
scalib/test_cmake_config.cmd

@ -1,40 +0,0 @@
@echo off
REM 测试CMake配置的脚本
REM 设置构建目录
set BUILD_DIR=build_test
REM 清理旧的构建目录
if exist %BUILD_DIR% rmdir /s /q %BUILD_DIR%
REM 创建构建目录
mkdir %BUILD_DIR%
REM 运行CMake配置
pushd %BUILD_DIR%
cmake ..
if %ERRORLEVEL% neq 0 (
echo CMake配置失败!
popd
exit /b 1
)
REM 构建项目
cmake --build . --config Release
if %ERRORLEVEL% neq 0 (
echo 构建失败!
popd
exit /b 1
)
popd
REM 检查bin目录是否存在
if exist bin (
echo bin目录已创建
dir bin
) else (
echo bin目录未创建
)
pause

33
scalib/test_compile.cmd

@ -1,33 +0,0 @@
@echo off
REM 简单的测试编译脚本
REM 设置Visual Studio环境
REM 尝试检测Visual Studio安装路径
set VS170COMNTOOLS=
for /f "delims=" %%i in ('reg query "HKLM\SOFTWARE\Microsoft\VisualStudio\SxS\VS7" /v "17.0" 2^>nul') do (
for /f "tokens=2*" %%j in ("%%i") do set "VS170COMNTOOLS=%%kCommon7\Tools\"
)
REM 如果找不到Visual Studio 2022,尝试其他版本
if not exist "%VS170COMNTOOLS%vsdevcmd.bat" (
echo 无法找到Visual Studio 2022环境
pause
exit /b 1
)
REM 设置环境变量
call "%VS170COMNTOOLS%vsdevcmd.bat" -arch=x64
REM 设置编译选项
set INCLUDE_DIR=include
set SRC_DIR=src
REM 编译服务器
cl.exe /I%INCLUDE_DIR% %SRC_DIR%\rpc_server.c %SRC_DIR%\rpc_common.c %SRC_DIR%\rpc_transport.c %SRC_DIR%\rpc_message.c /link ws2_32.lib /out:rpc_server_test.exe
REM 检查编译结果
if %ERRORLEVEL% EQU 0 (
echo 编译成功!可执行文件: rpc_server_test.exe
) else (
echo 编译失败!
)
Loading…
Cancel
Save