当前位置:首页 > 问答 > 正文

用C语言搞定Redis队列那点事,怎么实现和用法分享

(来源:CSDN博客《Redis入门教程》)用C语言搞定Redis队列那点事,其实说白了就是把Redis的那个列表(List)数据类型,当成一个队列来用,队列嘛,就是先进先出,跟我们平时排队买东西一个道理,最早排队的最先买到,下面我就直接说说怎么用C语言的hiredis这个客户端库来实现它。

你得确保你的开发环境里有了hiredis库。(来源:hiredis GitHub官方文档)没有的话,得先去下载编译安装一下,这个库是Redis官方推荐的C语言客户端,用起来比较直接。

基本思路

Redis的列表,一头是左边(left),一头是右边(right),我们通常用LPUSH命令从左边塞进去一个元素,用RPOP命令从右边吐出来一个元素,这样就构成了一个队列,你也可以反过来,用RPUSHLPOP,看你的习惯,只要保证进去和出来的方向是反着的就行。

代码实现(生产者-消费者例子)

我们来写个简单的例子,一个程序往队列里放东西(生产者),另一个程序从队列里取东西(消费者)。

连接Redis

无论你是生产还是消费,第一步都是先连上Redis服务器。

#include <stdio.h>
#include <stdlib.h>
#include <hiredis/hiredis.h>
redisContext* connect_redis(const char* host, int port) {
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    redisContext *c = redisConnectWithTimeout(host, port, timeout);
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }
    printf("Connected to Redis successfully!\n");
    return c;
}

(来源:hiredis自带的示例代码)这个函数就是用来建立连接的,host是服务器IP,127.0.0.1",port是端口,默认6379。

生产者代码

生产者的活很简单,就是不断地往队列里塞消息。

void producer(redisContext *c, const char* queue_name) {
    int message_id = 0;
    while (1) {
        char message[100];
        sprintf(message, "Message-%d", message_id++);
        // 用LPUSH命令把消息从列表左边加进去
        redisReply *reply = redisCommand(c, "LPUSH %s %s", queue_name, message);
        if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
            printf("LPUSH failed!\n");
            freeReplyObject(reply);
            break;
        }
        printf("Produced: %s\n", message);
        freeReplyObject(reply);
        sleep(1); // 每秒生产一条,模拟一下
    }
}

这里用了redisCommand函数来发送Redis命令。LPUSH queue_name message就是把消息内容放到名叫queue_name的列表最前面,记得每次用完redisReply都要用freeReplyObject释放掉,不然会内存泄漏。

消费者代码

消费者的活就是从队列另一头拿消息来处理。

void consumer(redisContext *c, const char* queue_name) {
    while (1) {
        // 用BRPOP命令从列表右边取出一个元素,BRPOP是阻塞版的,如果队列为空就等着。
        // 这里的0表示超时时间,0就是一直等下去。
        redisReply *reply = redisCommand(c, "BRPOP %s 0", queue_name);
        if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
            printf("BRPOP failed!\n");
            freeReplyObject(reply);
            break;
        }
        // BRPOP返回的是一个数组,第二个元素才是我们想要的消息内容
        if (reply->elements == 2) {
            printf("Consumed: %s\n", reply->element[1]->str);
        }
        freeReplyObject(reply);
    }
}

(来源:Redis命令参考手册)这里用了BRPOP而不是RPOP,关键区别在于BRPOP是阻塞的,如果队列是空的,RPOP会立刻返回一个nil,然后你的程序就得自己不停地循环去问,这叫忙等待,很浪费CPU,而BRPOP会让你的程序在这里安心睡觉,直到有消息来了或者超时时间到了才唤醒,这样效率高多了。BRPOP queue_name 0里的0就是说不设置超时,永远等。

主函数

把上面这些组合起来。

int main() {
    redisContext *c = connect_redis("127.0.0.1", 6379);
    const char* queue_name = "my_queue";
    // 为了演示,我们假设跑生产者。
    // 实际中,生产者消费者应该是不同的进程,这里你跑一个的时候把另一个注释掉试试。
    producer(c, queue_name);
    // consumer(c, queue_name);
    redisFree(c);
    return 0;
}

你运行的时候,可以开两个终端,一个运行生产者,一个运行消费者,就能看到效果了。

实际用起来要注意的几点

  1. 错误处理:上面的代码为了简单,出错很多直接退出了,真正用的时候你得好好处理各种错误,比如网络断了、Redis服务器重启了,你得能重连。
  2. 持久化:(来源:Redis持久化文档)Redis数据是放在内存里的,万一服务器宕机,内存里的数据就没了,如果你不想丢消息,得配置Redis的持久化(RDB或AOF),这样重启后数据还能恢复。
  3. 多个消费者:你可以启动多个消费者进程,他们都会用BRPOP从同一个队列里取消息,Redis会保证一条消息只会被其中一个消费者拿到,这样就可以轻松实现简单的负载均衡,这叫“竞争消费者”模式。
  4. 不是严格的队列:(来源:Redis数据类型说明)Redis的列表毕竟不是专业消息队列中间件(比如RabbitMQ、Kafka),它没有消息确认机制(Ack),也就是说,消费者用BRPOP拿到消息后,如果处理过程中程序崩溃了,这条消息就永远丢失了,因为Redis认为已经被消费了,对于要求高可靠性的场景,这可能不够。

用C语言借助hiredis操作Redis队列,核心就是那几个列表命令,对于简单的异步任务、解耦系统组件来说,这招又快又方便,但要是你的项目对消息可靠性要求特别高,那可能还得考虑更专业的消息中间件,希望这些直白的解释和代码对你有帮助。

用C语言搞定Redis队列那点事,怎么实现和用法分享