Browse Source

服务端rpc连接组件

master
xyiege 2 months ago
parent
commit
c8dd911488
  1. 37
      scalib/CMakeLists.txt
  2. 81
      scalib/include/rpc_common.h
  3. 58
      scalib/include/rpc_message.h
  4. 50
      scalib/include/rpc_transport.h
  5. 38
      scalib/src/CMakeLists.txt
  6. 230
      scalib/src/rpc_client.c
  7. 97
      scalib/src/rpc_common.c
  8. 191
      scalib/src/rpc_message.c
  9. 250
      scalib/src/rpc_server.c
  10. 185
      scalib/src/rpc_transport.c

37
scalib/CMakeLists.txt

@ -0,0 +1,37 @@
# CMake
cmake_minimum_required(VERSION 3.10)
#
project(rpc_demo VERSION 1.0 LANGUAGES C)
# C
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED ON)
#
message(STATUS "Project: ${PROJECT_NAME}")
message(STATUS "Version: ${PROJECT_VERSION}")
message(STATUS "C Compiler: ${CMAKE_C_COMPILER}")
message(STATUS "C Standard: ${CMAKE_C_STANDARD}")
#
include_directories(${CMAKE_SOURCE_DIR}/include)
#
add_subdirectory(src)
# cmake
list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
#
install(TARGETS rpc_server rpc_client
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)
#
install(DIRECTORY ${CMAKE_SOURCE_DIR}/include/
DESTINATION include/rpc_demo
FILES_MATCHING PATTERN "*.h"
)

81
scalib/include/rpc_common.h

@ -0,0 +1,81 @@
/*
* rpc_common.h
* RPC框架的公共数据结构
*/
#ifndef RPC_COMMON_H
#define RPC_COMMON_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
/* 错误码定义 */
#define RPC_SUCCESS 0 // 成功
#define RPC_ERROR -1 // 一般错误
#define RPC_NET_ERROR -2 // 网络错误
#define RPC_TIMEOUT -3 // 超时错误
#define RPC_INVALID_ARGS -4 // 参数无效
#define RPC_FUNC_NOT_FOUND -5 // 函数未找到
/* 最大消息大小 */
#define MAX_MESSAGE_SIZE 4096
/* 最大函数名长度 */
#define MAX_FUNC_NAME_LEN 64
/* 最大参数数量 */
#define MAX_ARGS_COUNT 16
/* 参数类型枚举 */
typedef enum {
RPC_TYPE_INT,
RPC_TYPE_FLOAT,
RPC_TYPE_DOUBLE,
RPC_TYPE_STRING,
RPC_TYPE_BOOL,
RPC_TYPE_VOID
} rpc_param_type_t;
/* 参数数据结构 */
typedef struct {
rpc_param_type_t type; // 参数类型
union {
int32_t int_val; // 整数值
float float_val; // 浮点值
double double_val; // 双精度值
const char* string_val; // 字符串值
bool bool_val; // 布尔值
} value; // 参数值
} rpc_param_t;
/* RPC请求数据结构 */
typedef struct {
char func_name[MAX_FUNC_NAME_LEN]; // 函数名
int args_count; // 参数数量
rpc_param_t args[MAX_ARGS_COUNT]; // 参数数组
} rpc_request_t;
/* RPC响应数据结构 */
typedef struct {
int result_code; // 结果代码
rpc_param_type_t return_type; // 返回值类型
union {
int32_t int_val; // 整数值
float float_val; // 浮点值
double double_val; // 双精度值
const char* string_val; // 字符串值
bool bool_val; // 布尔值
} return_value; // 返回值
} rpc_response_t;
/* 工具函数声明 */
void rpc_init_param(rpc_param_t* param, rpc_param_type_t type);
void rpc_free_params(rpc_param_t* params, int count);
void rpc_free_request(rpc_request_t* request);
void rpc_free_response(rpc_response_t* response);
const char* rpc_error_to_string(int error_code);
#endif /* RPC_COMMON_H */

58
scalib/include/rpc_message.h

@ -0,0 +1,58 @@
/*
* rpc_message.h
* RPC消息的序列化和反序列化接口
*/
#ifndef RPC_MESSAGE_H
#define RPC_MESSAGE_H
#include "rpc_common.h"
/* 消息类型枚举 */
typedef enum {
RPC_MESSAGE_REQUEST, // 请求消息
RPC_MESSAGE_RESPONSE // 响应消息
} rpc_message_type_t;
/* RPC消息头部 */
typedef struct {
rpc_message_type_t type; // 消息类型
uint32_t payload_size; // 负载大小
} rpc_message_header_t;
/* RPC消息结构 */
typedef struct {
rpc_message_header_t header; // 消息头部
union {
rpc_request_t request; // 请求消息
rpc_response_t response; // 响应消息
} payload; // 消息负载
} rpc_message_t;
/* 消息处理函数声明 */
// 序列化请求消息到缓冲区
int rpc_serialize_request(const rpc_request_t* request, void* buffer, size_t buffer_size);
// 反序列化缓冲区到请求消息
int rpc_deserialize_request(const void* buffer, size_t buffer_size, rpc_request_t* request);
// 序列化响应消息到缓冲区
int rpc_serialize_response(const rpc_response_t* response, void* buffer, size_t buffer_size);
// 反序列化缓冲区到响应消息
int rpc_deserialize_response(const void* buffer, size_t buffer_size, rpc_response_t* response);
// 发送请求消息
int rpc_send_request(rpc_transport_t* transport, const rpc_request_t* request);
// 接收请求消息
int rpc_recv_request(rpc_transport_t* transport, rpc_request_t* request);
// 发送响应消息
int rpc_send_response(rpc_transport_t* transport, const rpc_response_t* response);
// 接收响应消息
int rpc_recv_response(rpc_transport_t* transport, rpc_response_t* response);
#endif /* RPC_MESSAGE_H */

50
scalib/include/rpc_transport.h

@ -0,0 +1,50 @@
/*
* rpc_transport.h
* RPC传输层的接口
*/
#ifndef RPC_TRANSPORT_H
#define RPC_TRANSPORT_H
#include "rpc_common.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
/* 传输上下文 */
typedef struct {
int socket_fd; // 套接字文件描述符
struct sockaddr_in address; // 地址信息
} rpc_transport_t;
/* 服务器上下文 */
typedef struct {
int server_fd; // 服务器套接字文件描述符
struct sockaddr_in address; // 服务器地址信息
} rpc_server_t;
/* 传输层函数声明 */
// 初始化服务器
int rpc_server_init(rpc_server_t* server, const char* host, uint16_t port, int backlog);
// 服务器等待连接
int rpc_server_accept(rpc_server_t* server, rpc_transport_t* transport);
// 关闭服务器
void rpc_server_close(rpc_server_t* server);
// 初始化客户端传输
int rpc_client_init(rpc_transport_t* transport, const char* server_host, uint16_t server_port);
// 发送数据
int rpc_transport_send(rpc_transport_t* transport, const void* data, size_t data_size);
// 接收数据
int rpc_transport_recv(rpc_transport_t* transport, void* buffer, size_t buffer_size);
// 关闭传输连接
void rpc_transport_close(rpc_transport_t* transport);
#endif /* RPC_TRANSPORT_H */

38
scalib/src/CMakeLists.txt

@ -0,0 +1,38 @@
#
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
# RPC
add_library(rpc_common STATIC
rpc_common.c
rpc_message.c
rpc_transport.c
)
# RPC
add_executable(rpc_server
rpc_server.c
)
target_link_libraries(rpc_server
rpc_common
pthread
)
# RPC
add_executable(rpc_client
rpc_client.c
)
target_link_libraries(rpc_client
rpc_common
pthread
)
#
install(TARGETS rpc_common rpc_server rpc_client
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)

230
scalib/src/rpc_client.c

@ -0,0 +1,230 @@
/*
* rpc_client.c
* RPC客户端实现
*/
#include "rpc_common.h"
#include "rpc_transport.h"
#include "rpc_message.h"
/*
* RPC客户端结构
*/
typedef struct {
rpc_transport_t transport;
bool connected;
} rpc_client_t;
/*
* RPC客户端
*/
int rpc_client_create(rpc_client_t* client, const char* server_host, uint16_t server_port) {
if (!client || !server_host) {
return RPC_INVALID_ARGS;
}
memset(client, 0, sizeof(rpc_client_t));
// 连接到服务器
int ret = rpc_client_init(&client->transport, server_host, server_port);
if (ret != RPC_SUCCESS) {
return ret;
}
client->connected = true;
return RPC_SUCCESS;
}
/*
* RPC客户端
*/
void rpc_client_destroy(rpc_client_t* client) {
if (!client) {
return;
}
if (client->connected) {
rpc_transport_close(&client->transport);
client->connected = false;
}
}
/*
*
*/
int rpc_call(rpc_client_t* client, const char* func_name,
rpc_param_t* params, int args_count,
rpc_param_t* return_value) {
if (!client || !func_name || !client->connected) {
return RPC_INVALID_ARGS;
}
if (args_count < 0 || args_count > MAX_ARGS_COUNT) {
return RPC_INVALID_ARGS;
}
// 创建请求
rpc_request_t request;
memset(&request, 0, sizeof(rpc_request_t));
strncpy(request.func_name, func_name, MAX_FUNC_NAME_LEN - 1);
request.func_name[MAX_FUNC_NAME_LEN - 1] = '\0';
request.args_count = args_count;
// 复制参数
if (args_count > 0 && params) {
for (int i = 0; i < args_count; i++) {
request.args[i] = params[i];
// 注意:对于字符串参数,这里只是浅拷贝指针,实际应用中可能需要深拷贝
}
}
// 发送请求
int ret = rpc_send_request(&client->transport, &request);
if (ret != RPC_SUCCESS) {
return ret;
}
// 接收响应
rpc_response_t response;
memset(&response, 0, sizeof(rpc_response_t));
ret = rpc_recv_response(&client->transport, &response);
if (ret != RPC_SUCCESS) {
return ret;
}
// 检查结果代码
if (response.result_code != RPC_SUCCESS) {
return response.result_code;
}
// 复制返回值
if (return_value) {
return_value->type = response.return_type;
switch (response.return_type) {
case RPC_TYPE_INT:
return_value->value.int_val = response.return_value.int_val;
break;
case RPC_TYPE_FLOAT:
return_value->value.float_val = response.return_value.float_val;
break;
case RPC_TYPE_DOUBLE:
return_value->value.double_val = response.return_value.double_val;
break;
case RPC_TYPE_STRING:
// 注意:这里需要复制字符串,避免内存问题
if (response.return_value.string_val) {
return_value->value.string_val = strdup(response.return_value.string_val);
} else {
return_value->value.string_val = NULL;
}
break;
case RPC_TYPE_BOOL:
return_value->value.bool_val = response.return_value.bool_val;
break;
case RPC_TYPE_VOID:
default:
break;
}
}
// 释放响应中的资源
if (response.return_type == RPC_TYPE_STRING && response.return_value.string_val) {
free((void*)response.return_value.string_val);
}
return RPC_SUCCESS;
}
/*
*
*/
int demo_add(rpc_client_t* client, int a, int b, int* result) {
rpc_param_t params[2];
rpc_param_t return_value;
// 设置参数
rpc_init_param(&params[0], RPC_TYPE_INT);
params[0].value.int_val = a;
rpc_init_param(&params[1], RPC_TYPE_INT);
params[1].value.int_val = b;
// 设置返回值
rpc_init_param(&return_value, RPC_TYPE_INT);
// 调用远程函数
int ret = rpc_call(client, "add", params, 2, &return_value);
if (ret == RPC_SUCCESS && result) {
*result = return_value.value.int_val;
}
return ret;
}
/*
*
*/
int demo_get_server_info(rpc_client_t* client, char** info) {
rpc_param_t return_value;
// 设置返回值
rpc_init_param(&return_value, RPC_TYPE_STRING);
// 调用远程函数
int ret = rpc_call(client, "get_server_info", NULL, 0, &return_value);
if (ret == RPC_SUCCESS && info) {
*info = strdup(return_value.value.string_val);
}
// 释放返回值中的资源
if (return_value.type == RPC_TYPE_STRING && return_value.value.string_val) {
free((void*)return_value.value.string_val);
}
return ret;
}
/*
*
*/
int main(int argc, char* argv[]) {
rpc_client_t client;
const char* server_host = "127.0.0.1";
uint16_t server_port = 8080;
// 初始化客户端
printf("Connecting to RPC server at %s:%d...\n", server_host, server_port);
int ret = rpc_client_create(&client, server_host, server_port);
if (ret != RPC_SUCCESS) {
printf("Failed to connect to server: %s\n", rpc_error_to_string(ret));
return 1;
}
printf("Connected to server successfully\n");
// 测试远程加法函数
int a = 10, b = 20, sum;
ret = demo_add(&client, a, b, &sum);
if (ret == RPC_SUCCESS) {
printf("Remote call: %d + %d = %d\n", a, b, sum);
} else {
printf("Failed to call 'add': %s\n", rpc_error_to_string(ret));
}
// 测试获取服务器信息
char* server_info = NULL;
ret = demo_get_server_info(&client, &server_info);
if (ret == RPC_SUCCESS && server_info) {
printf("Server info: %s\n", server_info);
free(server_info);
} else {
printf("Failed to call 'get_server_info': %s\n", rpc_error_to_string(ret));
}
// 清理资源
rpc_client_destroy(&client);
printf("Client disconnected\n");
return 0;
}

97
scalib/src/rpc_common.c

@ -0,0 +1,97 @@
/*
* rpc_common.c
* RPC框架的公共工具函数
*/
#include "rpc_common.h"
/*
* RPC参数
*/
void rpc_init_param(rpc_param_t* param, rpc_param_type_t type) {
if (!param) return;
param->type = type;
switch (type) {
case RPC_TYPE_INT:
param->value.int_val = 0;
break;
case RPC_TYPE_FLOAT:
param->value.float_val = 0.0f;
break;
case RPC_TYPE_DOUBLE:
param->value.double_val = 0.0;
break;
case RPC_TYPE_STRING:
param->value.string_val = NULL;
break;
case RPC_TYPE_BOOL:
param->value.bool_val = false;
break;
case RPC_TYPE_VOID:
default:
break;
}
}
/*
*
*/
void rpc_free_params(rpc_param_t* params, int count) {
if (!params || count <= 0) return;
for (int i = 0; i < count; i++) {
if (params[i].type == RPC_TYPE_STRING && params[i].value.string_val) {
free((void*)params[i].value.string_val);
params[i].value.string_val = NULL;
}
}
}
/*
*
*/
void rpc_free_request(rpc_request_t* request) {
if (!request) return;
rpc_free_params(request->args, request->args_count);
request->args_count = 0;
request->func_name[0] = '\0';
}
/*
*
*/
void rpc_free_response(rpc_response_t* response) {
if (!response) return;
if (response->return_type == RPC_TYPE_STRING && response->return_value.string_val) {
free((void*)response->return_value.string_val);
response->return_value.string_val = NULL;
}
response->result_code = 0;
response->return_type = RPC_TYPE_VOID;
}
/*
*
*/
const char* rpc_error_to_string(int error_code) {
switch (error_code) {
case RPC_SUCCESS:
return "Success";
case RPC_ERROR:
return "General error";
case RPC_NET_ERROR:
return "Network error";
case RPC_TIMEOUT:
return "Timeout error";
case RPC_INVALID_ARGS:
return "Invalid arguments";
case RPC_FUNC_NOT_FOUND:
return "Function not found";
default:
return "Unknown error";
}
}

191
scalib/src/rpc_message.c

@ -0,0 +1,191 @@
/*
* rpc_message.c
* RPC消息的序列化和反序列化功能
*/
#include "rpc_message.h"
#include "rpc_transport.h"
/*
*
*/
int rpc_serialize_request(const rpc_request_t* request, void* buffer, size_t buffer_size) {
if (!request || !buffer || buffer_size < sizeof(rpc_request_t)) {
return RPC_INVALID_ARGS;
}
// 复制请求消息到缓冲区
memcpy(buffer, request, sizeof(rpc_request_t));
// 注意:这里简化了字符串参数的序列化,实际上可能需要更复杂的处理
// 在实际应用中,可能需要计算字符串的总长度并动态分配足够的缓冲区
return RPC_SUCCESS;
}
/*
*
*/
int rpc_deserialize_request(const void* buffer, size_t buffer_size, rpc_request_t* request) {
if (!buffer || !request || buffer_size < sizeof(rpc_request_t)) {
return RPC_INVALID_ARGS;
}
// 从缓冲区复制请求消息
memcpy(request, buffer, sizeof(rpc_request_t));
// 注意:这里简化了字符串参数的反序列化
// 在实际应用中,可能需要处理字符串的动态分配和复制
return RPC_SUCCESS;
}
/*
*
*/
int rpc_serialize_response(const rpc_response_t* response, void* buffer, size_t buffer_size) {
if (!response || !buffer || buffer_size < sizeof(rpc_response_t)) {
return RPC_INVALID_ARGS;
}
// 复制响应消息到缓冲区
memcpy(buffer, response, sizeof(rpc_response_t));
// 注意:这里简化了字符串返回值的序列化
return RPC_SUCCESS;
}
/*
*
*/
int rpc_deserialize_response(const void* buffer, size_t buffer_size, rpc_response_t* response) {
if (!buffer || !response || buffer_size < sizeof(rpc_response_t)) {
return RPC_INVALID_ARGS;
}
// 从缓冲区复制响应消息
memcpy(response, buffer, sizeof(rpc_response_t));
// 注意:这里简化了字符串返回值的反序列化
return RPC_SUCCESS;
}
/*
*
*/
int rpc_send_request(rpc_transport_t* transport, const rpc_request_t* request) {
if (!transport || !request) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_t message;
// 设置消息头部
message.header.type = RPC_MESSAGE_REQUEST;
message.header.payload_size = sizeof(rpc_request_t);
// 复制请求到消息负载
message.payload.request = *request;
// 序列化整个消息
size_t message_size = sizeof(rpc_message_header_t) + message.header.payload_size;
if (message_size > MAX_MESSAGE_SIZE) {
return RPC_ERROR;
}
memcpy(buffer, &message, message_size);
// 发送消息
return rpc_transport_send(transport, buffer, message_size);
}
/*
*
*/
int rpc_recv_request(rpc_transport_t* transport, rpc_request_t* request) {
if (!transport || !request) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_header_t header;
// 接收消息头部
if (rpc_transport_recv(transport, &header, sizeof(rpc_message_header_t)) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
// 检查消息类型和大小
if (header.type != RPC_MESSAGE_REQUEST || header.payload_size > MAX_MESSAGE_SIZE - sizeof(rpc_message_header_t)) {
return RPC_ERROR;
}
// 接收消息负载
if (rpc_transport_recv(transport, request, header.payload_size) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
return RPC_SUCCESS;
}
/*
*
*/
int rpc_send_response(rpc_transport_t* transport, const rpc_response_t* response) {
if (!transport || !response) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_t message;
// 设置消息头部
message.header.type = RPC_MESSAGE_RESPONSE;
message.header.payload_size = sizeof(rpc_response_t);
// 复制响应到消息负载
message.payload.response = *response;
// 序列化整个消息
size_t message_size = sizeof(rpc_message_header_t) + message.header.payload_size;
if (message_size > MAX_MESSAGE_SIZE) {
return RPC_ERROR;
}
memcpy(buffer, &message, message_size);
// 发送消息
return rpc_transport_send(transport, buffer, message_size);
}
/*
*
*/
int rpc_recv_response(rpc_transport_t* transport, rpc_response_t* response) {
if (!transport || !response) {
return RPC_INVALID_ARGS;
}
uint8_t buffer[MAX_MESSAGE_SIZE];
rpc_message_header_t header;
// 接收消息头部
if (rpc_transport_recv(transport, &header, sizeof(rpc_message_header_t)) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
// 检查消息类型和大小
if (header.type != RPC_MESSAGE_RESPONSE || header.payload_size > MAX_MESSAGE_SIZE - sizeof(rpc_message_header_t)) {
return RPC_ERROR;
}
// 接收消息负载
if (rpc_transport_recv(transport, response, header.payload_size) != RPC_SUCCESS) {
return RPC_NET_ERROR;
}
return RPC_SUCCESS;
}

250
scalib/src/rpc_server.c

@ -0,0 +1,250 @@
/*
* rpc_server.c
* RPC服务器实现
*/
#include "rpc_common.h"
#include "rpc_transport.h"
#include "rpc_message.h"
#include <pthread.h>
/* 函数处理函数类型定义 */
typedef int (*rpc_handler_func_t)(rpc_param_t* params, int args_count, rpc_param_t* return_value);
/* 函数处理器结构 */
typedef struct {
char func_name[MAX_FUNC_NAME_LEN];
rpc_handler_func_t handler;
rpc_param_type_t return_type;
rpc_param_type_t param_types[MAX_ARGS_COUNT];
int param_count;
} rpc_handler_t;
/* 函数处理器表 */
static rpc_handler_t handlers[MAX_FUNC_NAME_LEN];
static int handler_count = 0;
/*
*
*/
int rpc_register_function(const char* func_name, rpc_handler_func_t handler,
rpc_param_type_t return_type,
rpc_param_type_t* param_types, int param_count) {
if (!func_name || !handler || param_count < 0 || param_count > MAX_ARGS_COUNT) {
return RPC_INVALID_ARGS;
}
if (handler_count >= MAX_FUNC_NAME_LEN) {
return RPC_ERROR;
}
// 检查函数名是否已存在
for (int i = 0; i < handler_count; i++) {
if (strcmp(handlers[i].func_name, func_name) == 0) {
return RPC_ERROR;
}
}
// 添加新的函数处理器
strncpy(handlers[handler_count].func_name, func_name, MAX_FUNC_NAME_LEN - 1);
handlers[handler_count].func_name[MAX_FUNC_NAME_LEN - 1] = '\0';
handlers[handler_count].handler = handler;
handlers[handler_count].return_type = return_type;
handlers[handler_count].param_count = param_count;
// 复制参数类型
if (param_count > 0 && param_types) {
memcpy(handlers[handler_count].param_types, param_types,
sizeof(rpc_param_type_t) * param_count);
}
handler_count++;
printf("Registered function: %s\n", func_name);
return RPC_SUCCESS;
}
/*
*
*/
rpc_handler_t* rpc_find_handler(const char* func_name) {
for (int i = 0; i < handler_count; i++) {
if (strcmp(handlers[i].func_name, func_name) == 0) {
return &handlers[i];
}
}
return NULL;
}
/*
*
*/
void* handle_client(void* arg) {
rpc_transport_t* transport = (rpc_transport_t*)arg;
rpc_request_t request;
rpc_response_t response;
int ret;
while (1) {
// 接收请求
ret = rpc_recv_request(transport, &request);
if (ret != RPC_SUCCESS) {
printf("Failed to receive request: %s\n", rpc_error_to_string(ret));
break;
}
printf("Received request: %s with %d arguments\n", request.func_name, request.args_count);
// 查找函数处理器
rpc_handler_t* handler = rpc_find_handler(request.func_name);
if (!handler) {
printf("Function not found: %s\n", request.func_name);
response.result_code = RPC_FUNC_NOT_FOUND;
response.return_type = RPC_TYPE_VOID;
rpc_send_response(transport, &response);
rpc_free_request(&request);
continue;
}
// 检查参数数量
if (request.args_count != handler->param_count) {
printf("Invalid argument count for %s\n", request.func_name);
response.result_code = RPC_INVALID_ARGS;
response.return_type = RPC_TYPE_VOID;
rpc_send_response(transport, &response);
rpc_free_request(&request);
continue;
}
// 执行函数
rpc_param_t return_value;
rpc_init_param(&return_value, handler->return_type);
ret = handler->handler(request.args, request.args_count, &return_value);
// 设置响应
response.result_code = ret;
response.return_type = handler->return_type;
// 复制返回值
switch (handler->return_type) {
case RPC_TYPE_INT:
response.return_value.int_val = return_value.value.int_val;
break;
case RPC_TYPE_FLOAT:
response.return_value.float_val = return_value.value.float_val;
break;
case RPC_TYPE_DOUBLE:
response.return_value.double_val = return_value.value.double_val;
break;
case RPC_TYPE_STRING:
// 注意:这里需要复制字符串,避免内存问题
response.return_value.string_val = strdup(return_value.value.string_val);
break;
case RPC_TYPE_BOOL:
response.return_value.bool_val = return_value.value.bool_val;
break;
case RPC_TYPE_VOID:
default:
break;
}
// 发送响应
rpc_send_response(transport, &response);
// 释放资源
rpc_free_request(&request);
if (handler->return_type == RPC_TYPE_STRING && return_value.value.string_val) {
free((void*)return_value.value.string_val);
}
if (response.return_type == RPC_TYPE_STRING && response.return_value.string_val) {
free((void*)response.return_value.string_val);
}
}
// 关闭连接
rpc_transport_close(transport);
free(transport);
return NULL;
}
/*
*
*/
int add_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
if (args_count < 2 || params[0].type != RPC_TYPE_INT || params[1].type != RPC_TYPE_INT) {
return RPC_INVALID_ARGS;
}
int a = params[0].value.int_val;
int b = params[1].value.int_val;
return_value->value.int_val = a + b;
printf("Executed add(%d, %d) = %d\n", a, b, return_value->value.int_val);
return RPC_SUCCESS;
}
/*
*
*/
int get_server_info_handler(rpc_param_t* params, int args_count, rpc_param_t* return_value) {
const char* info = "RPC Server v1.0";
return_value->value.string_val = strdup(info);
printf("Executed get_server_info()\n");
return RPC_SUCCESS;
}
/*
*
*/
int main(int argc, char* argv[]) {
rpc_server_t server;
int port = 8080;
// 注册示例函数
rpc_param_type_t add_params[] = {RPC_TYPE_INT, RPC_TYPE_INT};
rpc_register_function("add", add_handler, RPC_TYPE_INT, add_params, 2);
rpc_register_function("get_server_info", get_server_info_handler, RPC_TYPE_STRING, NULL, 0);
// 初始化服务器
if (rpc_server_init(&server, "0.0.0.0", port, 5) != RPC_SUCCESS) {
printf("Failed to initialize server\n");
return 1;
}
printf("RPC Server listening on port %d\n", port);
printf("Waiting for client connections...\n");
while (1) {
// 接受客户端连接
rpc_transport_t* client_transport = (rpc_transport_t*)malloc(sizeof(rpc_transport_t));
if (!client_transport) {
printf("Failed to allocate memory\n");
continue;
}
if (rpc_server_accept(&server, client_transport) != RPC_SUCCESS) {
free(client_transport);
continue;
}
// 创建线程处理客户端请求
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, handle_client, client_transport) != 0) {
perror("pthread_create failed");
rpc_transport_close(client_transport);
free(client_transport);
continue;
}
// 分离线程,自动回收资源
pthread_detach(thread_id);
}
// 关闭服务器
rpc_server_close(&server);
return 0;
}

185
scalib/src/rpc_transport.c

@ -0,0 +1,185 @@
/*
* rpc_transport.c
* RPC传输层的功能
*/
#include "rpc_transport.h"
/*
* RPC服务器
*/
int rpc_server_init(rpc_server_t* server, const char* host, uint16_t port, int backlog) {
if (!server || !host) {
return RPC_INVALID_ARGS;
}
// 创建套接字
server->server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server->server_fd < 0) {
perror("socket creation failed");
return RPC_NET_ERROR;
}
// 设置套接字选项,允许地址重用
int opt = 1;
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
&opt, sizeof(opt))) {
perror("setsockopt failed");
close(server->server_fd);
return RPC_NET_ERROR;
}
// 设置服务器地址结构
memset(&server->address, 0, sizeof(server->address));
server->address.sin_family = AF_INET;
server->address.sin_addr.s_addr = inet_addr(host);
server->address.sin_port = htons(port);
// 绑定地址到套接字
if (bind(server->server_fd, (struct sockaddr*)&server->address, sizeof(server->address)) < 0) {
perror("bind failed");
close(server->server_fd);
return RPC_NET_ERROR;
}
// 开始监听连接
if (listen(server->server_fd, backlog) < 0) {
perror("listen failed");
close(server->server_fd);
return RPC_NET_ERROR;
}
printf("RPC Server started on %s:%d\n", host, port);
return RPC_SUCCESS;
}
/*
*
*/
int rpc_server_accept(rpc_server_t* server, rpc_transport_t* transport) {
if (!server || !transport) {
return RPC_INVALID_ARGS;
}
socklen_t addrlen = sizeof(transport->address);
transport->socket_fd = accept(server->server_fd, (struct sockaddr*)&transport->address, &addrlen);
if (transport->socket_fd < 0) {
perror("accept failed");
return RPC_NET_ERROR;
}
printf("Client connected: %s:%d\n",
inet_ntoa(transport->address.sin_addr),
ntohs(transport->address.sin_port));
return RPC_SUCCESS;
}
/*
* RPC服务器
*/
void rpc_server_close(rpc_server_t* server) {
if (!server || server->server_fd < 0) {
return;
}
close(server->server_fd);
server->server_fd = -1;
printf("RPC Server closed\n");
}
/*
* RPC客户端传输
*/
int rpc_client_init(rpc_transport_t* transport, const char* server_host, uint16_t server_port) {
if (!transport || !server_host) {
return RPC_INVALID_ARGS;
}
// 创建套接字
transport->socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (transport->socket_fd < 0) {
perror("socket creation failed");
return RPC_NET_ERROR;
}
// 设置服务器地址结构
memset(&transport->address, 0, sizeof(transport->address));
transport->address.sin_family = AF_INET;
transport->address.sin_port = htons(server_port);
// 将主机名转换为IP地址
if (inet_pton(AF_INET, server_host, &transport->address.sin_addr) <= 0) {
perror("invalid address");
close(transport->socket_fd);
return RPC_NET_ERROR;
}
// 连接到服务器
if (connect(transport->socket_fd, (struct sockaddr*)&transport->address, sizeof(transport->address)) < 0) {
perror("connection failed");
close(transport->socket_fd);
return RPC_NET_ERROR;
}
printf("Connected to server %s:%d\n", server_host, server_port);
return RPC_SUCCESS;
}
/*
*
*/
int rpc_transport_send(rpc_transport_t* transport, const void* data, size_t data_size) {
if (!transport || !data || data_size == 0) {
return RPC_INVALID_ARGS;
}
// 发送所有数据
size_t sent_bytes = 0;
while (sent_bytes < data_size) {
ssize_t bytes = send(transport->socket_fd, (const char*)data + sent_bytes, data_size - sent_bytes, 0);
if (bytes < 0) {
perror("send failed");
return RPC_NET_ERROR;
}
sent_bytes += bytes;
}
return RPC_SUCCESS;
}
/*
*
*/
int rpc_transport_recv(rpc_transport_t* transport, void* buffer, size_t buffer_size) {
if (!transport || !buffer || buffer_size == 0) {
return RPC_INVALID_ARGS;
}
// 接收所有数据
size_t recv_bytes = 0;
while (recv_bytes < buffer_size) {
ssize_t bytes = recv(transport->socket_fd, (char*)buffer + recv_bytes, buffer_size - recv_bytes, 0);
if (bytes < 0) {
perror("recv failed");
return RPC_NET_ERROR;
} else if (bytes == 0) {
// 连接关闭
return RPC_NET_ERROR;
}
recv_bytes += bytes;
}
return RPC_SUCCESS;
}
/*
*
*/
void rpc_transport_close(rpc_transport_t* transport) {
if (!transport || transport->socket_fd < 0) {
return;
}
close(transport->socket_fd);
transport->socket_fd = -1;
}
Loading…
Cancel
Save