dbos.dev

DBOS simplifies building reliable and observable programs. Add a few annotations to your code to make it resilient to any failure.

  1. business logic in normal code, with branches, loops, subtasks, and retries. DBOS makes it resilient to any failure.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @DBOS.step()
    def validate_payment():
    ...

    @DBOS.workflow()
    def checkout_workflow()
    validate_payment()
    check_inventory()
    ship_order()
    notify_customer()
  2. Launch any task to run in the background and guarantee it eventually completes.

    1
    2
    3
    4
    5
    6
    7
    8
    @DBOS.workflow()
    def schedule_reminder(to_email, days_to_wait):
    DBOS.recv(days_to_seconds(days_to_wait))
    send_reminder_email(to_email, days_to_wait)

    @app.post("/email")
    def email_endpoint(request):
    DBOS.start_workflow(schedule_reminder, request.email, request.days)
  3. Schedule functions to run at specific times.

    1
    2
    3
    4
    5
    6
    @DBOS.scheduled("0 * * * *") # Run once an hour
    @DBOS.workflow()
    def run_hourly(scheduled_time, actual_time):
    results = search_hackernews("serverless")
    for comment, url in results:
    post_to_slack(comment, url)
  4. Build data pipelines that are reliable and observable by default. DBOS durable queues guarantee all your tasks complete.

    1
    2
    3
    4
    5
    6
    7
    8
    queue = Queue("indexing_queue")

    @DBOS.workflow()
    def indexing_workflow(urls):
    handles = []
    for url in urls:
    handles.append(queue.enqueue(index_step, url))
    return [h.get_result() for h in handles]
  5. Consume Kafka messages exactly-once, no need to worry about timeouts or offsets.

    1
    2
    3
    4
    5
    6
    @DBOS.kafka_consumer(config,["alerts-topic"])
    @DBOS.workflow()
    def process_kafka_alerts(msg):
    alerts = msg.value.decode()
    for alert in alerts:
    respond_to_alert(alert)

    https://docs.dbos.dev/python/tutorials/kafka-integration

  6. Use durable workflows to build reliable, fault-tolerant AI agents. Integrate with popular frameworks like LangChain and LlamaIndex.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @DBOS.workflow()
    def agentic_research_workflow(topic, max_iterations):
    research_results = []
    for i in range(max_iterations):
    research_result = research_query(topic)
    research_results.append(research_result)
    if not should_continue(research_results):
    break
    topic = generate_next_topic(topic, research_results)
    return synthesize_research_report(research_results)

    @DBOS.step()
    def research_query(topic):
    ...

在金融系统中的优势分析:

  1. 天然适合银行交易场景:DBOS的工作流模式完美匹配银行的多步骤交易处理需求
  2. PostgreSQL基础:许多银行已经在使用PostgreSQL,技术栈融合度高
  3. 故障恢复能力:关键业务操作的自动恢复功能对银行至关重要
  4. 代码简洁性:通过注解方式实现持久化,降低开发复杂度

潜在挑战:

  1. 监管合规:需要验证DBOS是否满足银行业的严格审计要求
  2. 性能要求:大型银行的高并发交易处理能力需要充分测试
  3. 企业集成:与现有核心银行系统的集成复杂性
  4. 安全标准:是否符合PCI-DSS、SOX等金融行业安全标准
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// case 1:客户服务流程
@DBOS.workflow()
async function loanApplication(applicationId: string) {
await documentVerification(applicationId);
await creditScoreCheck(applicationId);
await riskAssessment(applicationId);
await decisionMaking(applicationId);
await customerNotification(applicationId);
}

// case 2:内部资金管理
@DBOS.workflow()
async function liquidityManagement(branchId: string, amount: number) {
await calculateLiquidityNeeds(branchId);
await allocateFunds(branchId, amount);
await updateCentralLedger(branchId, amount);
await generateLiquidityReport(branchId, amount);
}

// case 3:核心交易处理
@DBOS.workflow()
async function realTimePayment(payment: PaymentRequest) {
// 实时支付处理,具备完整的故障恢复能力
await fraudDetection(payment);
await fundsVerification(payment);
await executePayment(payment);
await settlementProcessing(payment);
await complianceReporting(payment);
}

DBOS实际上是多种设计模式的巧妙组合:

  1. 责任链模式 (Chain of Responsibility)
1
2
3
4
5
6
7
8
9
// 经典责任链:每个处理器处理请求或传递给下一个
@DBOS.workflow()
async function loanApproval(application: LoanApplication) {
await documentVerification(application); // 处理器1:文档验证
await creditScoreCheck(application); // 处理器2:征信检查
await incomeVerification(application); // 处理器3:收入验证
await riskAssessment(application); // 处理器4:风险评估
await finalApproval(application); // 处理器5:最终审批
}
  1. Saga模式 (最核心的模式)

这是DBOS最准确的模式匹配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Saga模式:长事务的分解与补偿
@DBOS.workflow()
async function transferMoney(transfer: MoneyTransfer) {
try {
// 正向操作序列
await debitSourceAccount(transfer.from, transfer.amount);
await creditTargetAccount(transfer.to, transfer.amount);
await recordTransaction(transfer);
await sendNotification(transfer);
} catch (error) {
// 补偿操作序列(回滚)
await creditSourceAccount(transfer.from, transfer.amount);
await cancelTransaction(transfer);
throw error;
}
}
  1. 命令模式 + 备忘录模式

  2. 1
    2
    3
    4
    5
    6
    7
    // 每个步骤都是可存储和重放的命令
    @DBOS.step()
    async function executePayment(payment: Payment) {
    // DBOS在PostgreSQL中存储此步骤的状态
    // 失败时可从此检查点重放
    return await processPayment(payment);
    }
  3. 状态机模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 银行交易状态机
@DBOS.workflow()
async function paymentStateMachine(payment: Payment) {
// 状态转换:待处理 -> 验证中
await validatePayment(payment);

// 状态转换:验证中 -> 资金冻结
await freezeFunds(payment);

// 状态转换:资金冻结 -> 执行中
await executePayment(payment);

// 状态转换:执行中 -> 已完成
await completePayment(payment);
}
  1. Oracle Database - transaction procedure

核心相似性分析

  1. 事务边界管理

    1
    2
    3
    4
    5
    6
    7
    8
    -- Oracle: 显式事务控制
    BEGIN
    SAVEPOINT sp1;
    -- operations
    COMMIT;
    EXCEPTION
    WHEN OTHERS THEN ROLLBACK TO sp1;
    END;
  2. 状态持久化

    1
    2
    3
    4
    5
    6
    7
    -- Oracle: 数据库状态 + 过程状态
    CREATE TABLE workflow_state (
    workflow_id NUMBER,
    step_name VARCHAR2(100),
    step_status VARCHAR2(20),
    step_data CLOB
    );
  3. 错误处理与补偿

    1
    2
    3
    4
    5
    6
    7
    -- Oracle: 异常处理
    EXCEPTION
    WHEN custom_exception THEN
    -- 补偿逻辑
    ROLLBACK TO savepoint;
    -- 清理操作
    END;
  4. 批处理与队列处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    -- Oracle: 批处理存储过程
    CREATE OR REPLACE PROCEDURE PROCESS_DAILY_BATCH AS
    CURSOR c_transactions IS
    SELECT * FROM pending_transactions;
    BEGIN
    FOR rec IN c_transactions LOOP
    PROCESS_SINGLE_TRANSACTION(rec.txn_id);
    END LOOP;
    COMMIT;
    END;

模式对比表

特性 Oracle存储过程 DBOS工作流
事务边界 BEGIN/COMMIT/ROLLBACK @DBOS.workflow()
步骤控制 嵌套BEGIN块 @DBOS.step()
状态保存 SAVEPOINT 自动检查点
错误恢复 EXCEPTION处理 自动重启恢复
数据持久化 Oracle表空间 PostgreSQL
并发控制 FOR UPDATE锁 PostgreSQL锁
DBOS的现代化改进:
  • 分布式执行能力
  • 自动故障恢复
  • 云原生架构友好
  • 更简洁的错误处理