图片

还更省钱!

我之前用的是一套很典型的Web应用技术栈:

  • PostgreSQL负责持久化数据存储
  • Redis负责缓存、发布订阅以及后台任务处理

两个数据库,两个体系需要管理,也意味着多了两处故障风险点。

后来我意识到:PostgreSQL可以做到Redis能做的一切。

于是我彻底移除了Redis,迁移过程是这样的。

一、设置:我使用Redis的目的

在替换之前,Redis主要处理三件事:

1、缓存(使用率70%)

// Cache API responses  
await redis.set(`user:${id}`, JSON.stringify(user), 'EX', 3600);

2、发布订阅(使用率20%)

// Real-time notifications  
redis.publish('notifications', JSON.stringify({ userId, message }));

3、后台消息队列(使用率10%)

// Using Bull/BullMQ  
queue.add('send-email', { to, subject, body });

痛点:

  • 需要备份两个数据库
  • Redis使用内存(规模化时成本很高)
  • Redis持久化机制……很复杂
  • Postgres和Redis之间还存在一次网络跳转开销

二、我为什么考虑替换Redis

原因一:成本

我的Redis配置:

  • AWS ElastiCache(2GB):每月45美元
  • 若扩容至 5GB,每月费用将增至110美元

PostgreSQL:

  • 已付费使用RDS(20GB存储):每月50美元
  • 即便增加5GB数据流量:每月仅需0.5美元

节省成本:每月约100美元

原因二:运行复杂性

使用 Redis:

Postgres backup ✅  
Redis backup ❓ (RDB? AOF? Both?)  
Postgres monitoring ✅  
Redis monitoring ❓  
Postgres failover ✅  
Redis Sentinel/Cluster ❓

不使用Redis:

Postgres backup ✅  
Postgres monitoring ✅  
Postgres failover ✅

系统依赖组件更少。

原因三:数据一致性

经典问题:

// Update database  
await db.query('UPDATE users SET name = $1 WHERE id = $2', [name, id]);  


// Invalidate cache  
await redis.del(`user:${id}`);  


// ⚠️ What if Redis is down?  
// ⚠️ What if this fails?  
// Now cache and DB are out of sync

在PostgreSQL中,这类问题通过事务即可解决。

三、PostgreSQL特性

1、使用非日志表进行缓存

Redis:

await redis.set('session:abc123', JSON.stringify(sessionData), 'EX', 3600);

PostgreSQL:

CREATE UNLOGGED TABLE cache(  
  key TEXT PRIMARY KEY,  
  value JSONB NOT NULL,  
  expires_at TIMESTAMPTZ NOT NULL  
);  


CREATE INDEX idx_cache_expires ON cache(expires_at);

插入:

INSERT INTO cache (key, value, expires_at)  
VALUES ($1, $2, NOW() + INTERVAL '1 hour')  
ON CONFLICT (key) DO UPDATE  
  SET value = EXCLUDED.value,  
      expires_at = EXCLUDED.expires_at;

读:

SELECT value FROM cache  
WHERE key = $1 AND expires_at > NOW();

清理(定期运行):

DELETE FROM cache WHERE expires_at < NOW();

什么是非日志表?

  • 跳过预写式日志(WAL)
  • 写入性能大幅提升
  • 崩溃后数据不保留(非常适合用作缓存!)

表现:

Redis SET: 0.05ms  
Postgres UNLOGGED INSERT: 0.08ms

用作缓存已经完全够用。

2、基于LISTEN或NOTIFY实现发布订阅功能

接下来就精彩了。

PostgreSQL具有原生的发布订阅功能,但大多数开发人员并不了解。

1)Redis的发布订阅功能

// Publisher  
redis.publish('notifications', JSON.stringify({ userId: 123, msg: 'Hello' }));  


// Subscriber  
redis.subscribe('notifications');  
redis.on('message', (channel, message) => {  
  console.log(message);  
});

2)PostgreSQL的发布订阅功能

-- Publisher  
NOTIFY notifications, '{"userId": 123, "msg": "Hello"}';
// Subscriber (Node.js with pg)  
const client = new Client({ connectionString: process.env.DATABASE_URL });  
await client.connect();  


await client.query('LISTEN notifications');  


client.on('notification', (msg) => {  
  const payload = JSON.parse(msg.payload);  
  console.log(payload);  
});

性能对比:

Redis pub/sub latency: 1-2ms  
Postgres NOTIFY latency: 2-5ms

性能略低,但优势明显:

  • 无需额外部署中间件
  • 可在事务中使用
  • 可与查询语句结合使用

3)实际应用场景:实时日志追踪

在我的日志管理应用中,需要实现日志实时流式推送。

使用Redis:

// When new log arrives  
await db.query('INSERT INTO logs ...');  
await redis.publish('logs:new', JSON.stringify(log));  


// Frontend listens  
redis.subscribe('logs:new');

问题:有两个操作,如果发布失败怎么办?

使用PostgreSQL:

CREATE FUNCTION notify_new_log() RETURNS TRIGGER AS $$  
BEGIN  
  PERFORM pg_notify('logs_new', row_to_json(NEW)::text);  
  RETURN NEW;  
END;  
$$ LANGUAGE plpgsql;  


CREATE TRIGGER log_inserted  
AFTER INSERT ON logs  
FOR EACH ROW EXECUTE FUNCTION notify_new_log();

现在整个操作是原子性的:插入数据与通知推送,要么同时生效,要么都不执行。

// Frontend (via SSE)  
app.get('/logs/stream', async (req, res) => {  
  const client = await pool.connect();  


    res.writeHead(200, {  
    'Content-Type': 'text/event-stream',  
    'Cache-Control': 'no-cache',  
  });  


    await client.query('LISTEN logs_new');  


      client.on('notification', (msg) => {  
    res.write(`data: ${msg.payload}\n\n`);  
  });  
});

结果:无需Redis即可实现实时日志流传输。

3、基于SKIP LOCKED实现任务队列

Redis(使用Bull或者BullMQ):

queue.add('send-email', { to, subject, body });  


queue.process('send-email', async (job) => {  
  await sendEmail(job.data);  
});

PostgreSQL:

CREATE TABLE jobs (  
  id BIGSERIAL PRIMARY KEY,  
  queue TEXT NOT NULL,  
  payload JSONB NOT NULL,  
  attempts INT DEFAULT 0,  
  max_attempts INT DEFAULT 3,  
  scheduled_at TIMESTAMPTZ DEFAULT NOW(),  
  created_at TIMESTAMPTZ DEFAULT NOW()  
);  


CREATE INDEX idx_jobs_queue ON jobs(queue, scheduled_at)   
WHERE attempts < max_attempts;

入队:

INSERT INTO jobs (queue, payload)  
VALUES ('send-email', '{"to": "user@example.com", "subject": "Hi"}');

工作进程(出队):

WITH next_job AS (  
  SELECT id FROM jobs  
  WHERE queue = $1  
    AND attempts < max_attempts  
    AND scheduled_at <= NOW()  
  ORDER BY scheduled_at  
  LIMIT 1  
  FOR UPDATE SKIP LOCKED  
)  
UPDATE jobs  
SET attempts = attempts + 1  
FROM next_job  
WHERE jobs.id = next_job.id  
RETURNING *;

神奇之处:FOR UPDATE SKIP LOCKED

这让PostgreSQL成为了无锁队列:

  • 多个工作进程可并发拉取任务
  • 任务不会被重复处理
  • 若工作进程崩溃,任务会自动重新变为可执行状态

表现:

Redis BRPOP: 0.1ms  
Postgres SKIP LOCKED: 0.3ms

对于大多数业务负载而言,性能差异可以忽略不计。

4、限流

Redis(经典限流方案):

const key = `ratelimit:${userId}`;  
const count = await redis.incr(key);  
if (count === 1) {  
  await redis.expire(key, 60); // 60 seconds  
}  


if (count > 100) {  
  throw new Error('Rate limit exceeded');  
}

PostgreSQL:

CREATE TABLE rate_limits (  
  user_id INT PRIMARY KEY,  
  request_count INT DEFAULT 0,  
  window_start TIMESTAMPTZ DEFAULT NOW()  
);  


-- Check and increment  
WITH current AS (  
  SELECT   
    request_count,  
    CASE   
      WHEN window_start < NOW() - INTERVAL '1 minute'  
      THEN 1  -- Reset counter  
      ELSE request_count + 1  
    END AS new_count  
  FROM rate_limits  
  WHERE user_id = $1  
  FOR UPDATE  
)  
UPDATE rate_limits  
SET   
  request_count = (SELECT new_count FROM current),  
  window_start = CASE  
    WHEN window_start < NOW() - INTERVAL '1 minute'  
    THEN NOW()  
    ELSE window_start  
  END  
WHERE user_id = $1  
RETURNING request_count;

或者用窗口函数更简单:

CREATE TABLE api_requests (  
  user_id INT NOT NULL,  
  created_at TIMESTAMPTZ DEFAULT NOW()  
);  


-- Check rate limit  
SELECT COUNT(*) FROM api_requests  
WHERE user_id = $1  
  AND created_at > NOW() - INTERVAL '1 minute';  


  -- If under limit, insert  
INSERT INTO api_requests (user_id) VALUES ($1);  


-- Cleanup old requests periodically  
DELETE FROM api_requests WHERE created_at < NOW() - INTERVAL '5 minutes';

Postgres的适用场景:

  • 需要基于复杂业务逻辑做限流(而非仅简单计数)
  • 希望限流数据与业务逻辑在同一事务中处理

Redis的适用场景:

  • 需要亚毫秒级限流
  • 极高吞吐量(每秒数百万请求)

5、基于JSONB实现会话存储

Redis:

await redis.set(`session:${sessionId}`, JSON.stringify(sessionData), 'EX', 86400);

PostgreSQL:

CREATE TABLE sessions (  
  id TEXT PRIMARY KEY,  
  data JSONB NOT NULL,  
  expires_at TIMESTAMPTZ NOT NULL  
);  


CREATE INDEX idx_sessions_expires ON sessions(expires_at);  


-- Insert/Update  
INSERT INTO sessions (id, data, expires_at)  
VALUES ($1, $2, NOW() + INTERVAL '24 hours')  
ON CONFLICT (id) DO UPDATE  
  SET data = EXCLUDED.data,  
      expires_at = EXCLUDED.expires_at;  


      -- Read  
SELECT data FROM sessions  
WHERE id = $1 AND expires_at > NOW();

附加内容:JSONB 运算符

你可以在会话内部进行查询:

-- Find all sessions for a specific user  
SELECT * FROM sessions  
WHERE data->>'userId' = '123';  


-- Find sessions with specific role  
SELECT * FROM sessions  
WHERE data->'user'->>'role' = 'admin';

你用Redis做不到这一点!

四、实际生产环境基准测试

我用生产数据集完成了基准测试:

1、测试设置

  • 硬件: AWS RDS db.t3.medium(2个虚拟CPU,4GB内存)
  • 数据集:100万条缓存条目,1万个会话
  • 工具:pgbench(自定义脚本)

2、结果

操作 Redis PostgreSQL 不同之处
缓存集 0.05毫秒 0.08毫秒 速度降低 60%
缓存获取 0.04毫秒 0.06毫秒 速度降低 50%
发布订阅 1.2毫秒 3.1毫秒 速度降低 158%
队列推送 0.08毫秒 0.15毫秒 速度降低 87%
队列弹出 0.12毫秒 0.31毫秒 速度降低 158%

PostgreSQL速度较慢,但是:

  • 所有操作耗时均保持在1毫秒以内
  • 省去了与Redis交互的网络开销
  • 降低基础设施复杂性

3、合并执行(真正的胜利)

场景:插入数据 + 缓存失效 + 通知订阅者

使用Redis:

await db.query('INSERT INTO posts ...');       // 2ms  
await redis.del('posts:latest');                // 1ms (network hop)  
await redis.publish('posts:new', data);         // 1ms (network hop)  
// Total: ~4ms

使用PostgreSQL:

BEGIN;  
INSERT INTO posts ...;                          -- 2ms  
DELETE FROM cache WHERE key = 'posts:latest';  -- 0.1ms (same connection)  
NOTIFY posts_new, '...';                        -- 0.1ms (same connection)  
COMMIT;  
-- Total: ~2.2ms

当多个操作合并执行时,PostgreSQL速度更快。

五、哪些场景仍建议保留Redis

如果符合以下条件,请不要替换Redis:

1、需要极致的性能

Redis: 100,000+ ops/sec (single instance)  
Postgres: 10,000-50,000 ops/sec

如果你每秒执行数百万次缓存读取操作,那就继续使用 Redis。

2、使用Redis特有的数据结构

Redis具备:

  • 有序集合(排行榜)
  • HyperLogLog(基数统计)
  • 地理空间索引
  • Streams(高级发布订阅)

PostgreSQL 虽有对应实现,但使用起来更为繁琐:

-- Leaderboard in Postgres (slower)  
SELECT user_id, score  
FROM leaderboard  
ORDER BY score DESC  
LIMIT 10;  


-- vs Redis  
ZREVRANGE leaderboard 0 9 WITHSCORES

3、架构需要独立缓存层

如果你的架构要求独立的缓存层(例如微服务架构),建议保留Redis。

六、迁移方案

不要一夜之间就彻底放弃Redis,以下是我的做法:

第一阶段:并排共存(第1周)

// Write to both  
await redis.set(key, value);  
await pg.query('INSERT INTO cache ...');  


// Read from Redis (still primary)  
let data = await redis.get(key);

监控:对比命中率、延迟。

第二阶段:从Postgres读取数据(第2周)

// Try Postgres first  
let data = await pg.query('SELECT value FROM cache WHERE key = $1', [key]);  


// Fallback to Redis  
if (!data) {  
  data = await redis.get(key);  
}

监控:错误率、性能。

第三阶段:仅写入Postgres(第3周)

// Only write to Postgres  
await pg.query('INSERT INTO cache ...');

监控:所有功能是否正常运行?

第四阶段:移除Redis(第4周)

# Turn off Redis  
# Watch for errors  
# Nothing breaks? Success!

七、代码示例:完整实现

1、缓存模块(PostgreSQL)

// cache.js  
class PostgresCache {  
  constructor(pool) {  
    this.pool = pool;  
  }  


    async get(key) {  
    const result = await this.pool.query(  
      'SELECT value FROM cache WHERE key = $1 AND expires_at > NOW()',  
      [key]  
    );  
    return result.rows[0]?.value;  
  }  


    async set(key, value, ttlSeconds = 3600) {  
    await this.pool.query(  
      `INSERT INTO cache (key, value, expires_at)  
       VALUES ($1, $2, NOW() + INTERVAL '${ttlSeconds} seconds')  
       ON CONFLICT (key) DO UPDATE  
         SET value = EXCLUDED.value,  
             expires_at = EXCLUDED.expires_at`,  
      [key, value]  
    );  
  }  


    async delete(key) {  
    await this.pool.query('DELETE FROM cache WHERE key = $1', [key]);  
  }  


    async cleanup() {  
    await this.pool.query('DELETE FROM cache WHERE expires_at < NOW()');  
  }  
}  


module.exports = PostgresCache;

2、发布订阅模块

// pubsub.js  
class PostgresPubSub {  
  constructor(pool) {  
    this.pool = pool;  
    this.listeners = new Map();  
  }  


    async publish(channel, message) {  
    const payload = JSON.stringify(message);  
    await this.pool.query('SELECT pg_notify($1, $2)', [channel, payload]);  
  }  


    async subscribe(channel, callback) {  
    const client = await this.pool.connect();  
    await client.query(`LISTEN ${channel}`);  
    client.on('notification', (msg) => {  
      if (msg.channel === channel) {  
        callback(JSON.parse(msg.payload));  
      }  
    });  


        this.listeners.set(channel, client);  
  }  


    async unsubscribe(channel) {  
    const client = this.listeners.get(channel);  
    if (client) {  
      await client.query(`UNLISTEN ${channel}`);  
      client.release();  
      this.listeners.delete(channel);  
    }  
  }  
}  


module.exports = PostgresPubSub;

3、任务队列模块

// queue.js  
class PostgresQueue {  
  constructor(pool) {  
    this.pool = pool;  
  }  


    async enqueue(queue, payload, scheduledAt = newDate()) {  
    await this.pool.query(  
      'INSERT INTO jobs (queue, payload, scheduled_at) VALUES ($1, $2, $3)',  
      [queue, payload, scheduledAt]  
    );  
  }  


    async dequeue(queue) {  
    const result = await this.pool.query(  
      `WITH next_job AS (  
        SELECT id FROM jobs  
        WHERE queue = $1  
          AND attempts < max_attempts  
          AND scheduled_at <= NOW()  
        ORDER BY scheduled_at  
        LIMIT 1  
        FOR UPDATE SKIP LOCKED  
      )  
      UPDATE jobs  
      SET attempts = attempts + 1  
      FROM next_job  
      WHERE jobs.id = next_job.id  
      RETURNING jobs.*`,  
      [queue]  
    );  


        return result.rows[0];  
  }  


    async complete(jobId) {  
    await this.pool.query('DELETE FROM jobs WHERE id = $1', [jobId]);  
  }  


    async fail(jobId, error) {  
    await this.pool.query(  
      `UPDATE jobs  
       SET attempts = max_attempts,  
           payload = payload || jsonb_build_object('error', $2)  
       WHERE id = $1`,  
      [jobId, error.message]  
    );  
  }  
}  


module.exports = PostgresQueue;

八、性能优化技巧

1、使用连接池

const { Pool } = require('pg');  


const pool = new Pool({  
  max: 20,  // Max connections  
  idleTimeoutMillis: 30000,  
  connectionTimeoutMillis: 2000,  
});

2、添加合适的索引

CREATE INDEX CONCURRENTLY idx_cache_key ON cache(key) WHERE expires_at > NOW();  
CREATE INDEX CONCURRENTLY idx_jobs_pending ON jobs(queue, scheduled_at)   
  WHERE attempts < max_attempts;

3、调整PostgreSQL配置

# postgresql.conf  
shared_buffers = 2GB           # 25% of RAM  
effective_cache_size = 6GB     # 75% of RAM  
work_mem = 50MB                # For complex queries  
maintenance_work_mem = 512MB   # For VACUUM

4、定期维护

-- Run daily  
VACUUM ANALYZE cache;  
VACUUM ANALYZE jobs;  


-- Or enable autovacuum (recommended)  
ALTER TABLE cache SET (autovacuum_vacuum_scale_factor = 0.1);

九、三个月后的结果

我省下了:

  • 每月100美元(不再使用 ElastiCache)
  • 备份复杂性降低50%
  • 少监控一项服务
  • 更简单的部署(减少一项依赖)

我失去了:

  • 缓存操作延迟则增加约0.5毫秒
  • Redis特有的数据结构(其实并不需要)

我会再次这样做吗?就这个业务场景而言:会。

是否推荐所有人都这么做?不推荐。

十、决策矩阵

如果满足以下条件,可用Postgres替换Redis:

  • 仅用Redis做简单缓存或者会话管理
  • 缓存命中率低于95%(写入次数过多)
  • 需要事务一致性
  • 可以接受操作速度慢0.1-1毫秒
  • 团队规模小,运维资源有限

以下场景建议保留Redis:

  • 每秒10万次以上的操作量
  • 使用Redis特有的数据结构(有序集合等)
  • 配备专业的运维团队
  • 亚毫秒级延迟为核心要求
  • 需要跨区域地理复制

十一、参考资料

1、PostgreSQL 特性

  • LISTEN/NOTIFY 官方文档
  • SKIP LOCKED 语法
  • UNLOGGED 表

2、工具

  • pgBouncer - 连接池
  • pg_stat_statements - 查询性能

3、其他解决方案

  • Graphile Worker - 基于Postgres的任务队列
  • pg-boss - 另一款Postgres队列实现

十二、最后

我用PostgreSQL替换了Redis的这些场景:

  • 缓存 → UNLOGGED 表
  • 发布订阅 → LISTEN/NOTIFY
  • 任务队列 → SKIP LOCKED
  • 会话存储 → JSONB 表

结果:

  • 每月节省100美元
  • 降低了运维复杂度
  • 性能略有下降(延迟增加 0.1–1ms),但可接受
  • 保证了事务一致性

适合这样做的场景:

  • 中小型应用
  • 简单的缓存需求
  • 希望减少系统组件、简化架构

不适合这样做的场景:

  • 性能要求高(每秒10万次以上操作)
  • 用Redis特有的功能
  • 配备专职运维团队

你是否用过Postgres替换Redis(或反过来用Redis替换Postgres)?实际体验如何?欢迎在评论区分享你的基准测试数据!

作者丨Polliog      编译丨dbaplus社群

来源丨网址:https://dev.to/polliog/i-replaced-redis-with-postgresql-and-its-faster-4942

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn

图片

图片