• 微信:WANCOME
  • 扫码加微信,提供专业咨询
  • 服务热线
  • 13215191218
    13027920428

  • 微信扫码访问本页
Ollama并发调用
使用 ollama-python 包如何高效地对本地 Llama 3.1 模型发起并发调用?

使用 ollama-python 包如何高效地对本地 Llama 3.1 模型发起并发调用?

问题分析

Ollama 作为本地 LLM 部署的热门工具,提供了 Python SDK 简化调用流程。然而,当需要处理批量请求时(如对 1000 个文档进行摘要),顺序调用会导致总耗时等于所有请求耗时之和,实际场景中可能需要数小时。

问题的根源在于 Ollama 的推理引擎是串行设计的。默认配置下,Ollama 服务器一次只处理一个请求,后续请求在队列中等待。即使客户端发起并发调用,服务器端仍是串行执行。

并发优化的第一个瓶颈是模型加载。Ollama 在收到第一个请求时将模型加载到显存,这个过程可能耗时几十秒(取决于模型大小和硬件)。如果每个并发请求都触发独立加载,资源开销巨大。

第二个瓶颈是 GPU 资源竞争。即使 Ollama 支持多实例,每个实例都需要独立显存。单张 24GB 显存的 RTX 4090 运行 Llama-3.1-8B 模型后,剩余显存可能不足以加载第二个实例。

第三个瓶颈是 Python 的 GIL(全局解释器锁)。使用 threading 模块的多线程无法真正并行执行 CPU 密集任务,需要使用 multiprocessing 或 asyncio。

解决原理

实现高效并发需要多层面优化:

策略一:异步 IO + 连接池

Ollama Python SDK 提供了异步接口(ollama.AsyncClient)。结合 Python 的 asyncio,可以在单个线程中并发发起多个请求,由网络 IO 的等待时间重叠实现加速。

关键是设置合适的连接池大小。默认情况下,httpx(底层 HTTP 库)限制并发连接数为 100。对于大量并发请求,可以调整 limits 参数。

策略二:批处理请求

将多个输入合并为一个批量请求。例如,将 10 个摘要请求合并为一个"对以下 10 段文本分别生成摘要"的请求。这种方式减少了请求往返次数,但需要 Prompt 设计确保输出格式可解析。

策略三:多进程并行

对于 CPU 密集的预处理或后处理任务,使用 multiprocessing.Pool 实现真正的多核并行。推理请求仍是异步发送,但数据准备和结果解析在不同进程完成。

策略四:Ollama 服务器配置

修改 Ollama 的环境变量 OLLAMA_MAX_LOADED_MODELS 控制同时加载的模型实例数。设置为大于 1 可以支持真正的多请求并行,但需要足够显存。

程序实现与说明

"""
Ollama Python 并发调用优化实现
展示多种并发策略及性能对比
"""

import asyncio
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import ollama
from ollama import AsyncClient

# ================== 策略一:异步并发请求 ==================

async def async_generate_batch(
    model: str,
    prompts: List[str],
    max_concurrent: int = 5
) -> List[Dict[str, Any]]:
    """
    异步并发生成请求
    使用信号量控制并发数量,避免过载
    
    model: 模型名称(如 "llama3.1:8b")
    prompts: 提示词列表
    max_concurrent: 最大并发请求数
    """
    # 创建异步客户端
    client = AsyncClient(
        host='http://localhost:11434',
        # 设置连接池限制
        headers={'Content-Type': 'application/json'}
    )
    
    # 信号量控制并发数量
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def generate_single(prompt: str, index: int) -> Dict[str, Any]:
        """单个请求的异步函数"""
        async with semaphore:  # 获取许可
            start_time = time.time()
            
            try:
                # 异步调用 generate
                response = await client.generate(
                    model=model,
                    prompt=prompt,
                    stream=False  # 禁用流式输出
                )
                
                elapsed = time.time() - start_time
                
                return {
                    'index': index,
                    'prompt': prompt[:50] + '...',  # 截断显示
                    'response': response['response'],
                    'elapsed': elapsed,
                    'status': 'success'
                }
                
            except Exception as e:
                return {
                    'index': index,
                    'prompt': prompt[:50] + '...',
                    'error': str(e),
                    'status': 'failed'
                }
    
    # 创建所有任务
    tasks = [
        generate_single(prompt, i)
        for i, prompt in enumerate(prompts)
    ]
    
    # 并发执行所有任务
    results = await asyncio.gather(*tasks)
    
    # 关闭客户端
    await client._client.aclose()
    
    return results


# ================== 策略二:批处理请求 ==================

def batch_generate(
    model: str,
    items: List[str],
    batch_size: int = 10
) -> List[str]:
    """
    批处理生成
    将多个输入合并到单个请求中,减少网络往返
    
    model: 模型名称
    items: 待处理项目列表
    batch_size: 每批包含的项目数
    """
    results = []
    
    # 批次处理
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        
        # 构建批处理 Prompt
        batch_prompt = f"""请对以下 {len(batch)} 段文本分别生成一句话摘要。
每段文本的摘要单独一行,格式为"第N段:摘要内容"。

文本列表:
"""
        for idx, item in enumerate(batch, 1):
            batch_prompt += f"\n【第{idx}段】\n{item}\n"
        
        batch_prompt += "\n请开始生成摘要:"
        
        # 发送请求
        response = ollama.generate(
            model=model,
            prompt=batch_prompt,
            stream=False
        )
        
        # 解析结果
        output_text = response['response']
        # 按行分割并提取摘要
        lines = output_text.strip().split('\n')
        batch_results = []
        for line in lines:
            # 尝试解析"第N段:内容"格式
            if ':' in line or ':' in line:
                parts = line.replace(':', ':').split(':', 1)
                if len(parts) == 2:
                    batch_results.append(parts[1].strip())
        
        # 如果解析失败,按行返回
        if len(batch_results) != len(batch):
            batch_results = [line.strip() for line in lines if line.strip()][:len(batch)]
        
        results.extend(batch_results)
    
    return results


# ================== 策略三:多进程 + 异步混合 ==================

def process_single_item(args: tuple) -> Dict[str, Any]:
    """
    单进程处理函数
    用于多进程池中的任务执行
    """
    model, prompt, index = args
    
    # 每个进程创建独立的 Ollama 客户端
    response = ollama.generate(
        model=model,
        prompt=prompt,
        stream=False
    )
    
    return {
        'index': index,
        'response': response['response'],
        'model': model
    }


def multiprocess_generate(
    model: str,
    prompts: List[str],
    num_workers: int = 4
) -> List[Dict[str, Any]]:
    """
    多进程并行生成
    利用多核 CPU 并行发起请求
    
    num_workers: 进程池大小,建议设为 CPU 核心数
    """
    # 准备参数
    args_list = [
        (model, prompt, i)
        for i, prompt in enumerate(prompts)
    ]
    
    # 使用进程池
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = list(executor.map(process_single_item, args_list))
    
    # 按原始顺序排序
    results.sort(key=lambda x: x['index'])
    
    return results


# ================== 性能对比测试 ==================

def benchmark_concurrent_strategies():
    """
    对比不同并发策略的性能
    """
    model_name = "llama3.1:8b"
    
    # 准备测试数据
    test_prompts = [
        f"请用一句话解释什么是{topic}。" 
        for topic in ["人工智能", "机器学习", "深度学习", "神经网络", "自然语言处理",
                     "计算机视觉", "强化学习", "生成对抗网络", "Transformer", "BERT"]
    ]
    
    print("=" * 60)
    print("并发策略性能对比测试")
    print(f"模型: {model_name}")
    print(f"请求数: {len(test_prompts)}")
    print("=" * 60)
    
    # 测试 1:顺序调用(基准)
    print("\n[测试 1] 顺序调用(基准)")
    start = time.time()
    sequential_results = []
    for prompt in test_prompts:
        response = ollama.generate(model=model_name, prompt=prompt, stream=False)
        sequential_results.append(response['response'])
    sequential_time = time.time() - start
    print(f"  耗时: {sequential_time:.2f}s")
    
    # 测试 2:异步并发
    print("\n[测试 2] 异步并发(并发数=5)")
    start = time.time()
    async_results = asyncio.run(async_generate_batch(model_name, test_prompts, max_concurrent=5))
    async_time = time.time() - start
    print(f"  耗时: {async_time:.2f}s")
    print(f"  加速比: {sequential_time/async_time:.2f}x")
    
    # 测试 3:批处理
    print("\n[测试 3] 批处理(批次大小=10)")
    start = time.time()
    batch_results = batch_generate(model_name, test_prompts, batch_size=10)
    batch_time = time.time() - start
    print(f"  耗时: {batch_time:.2f}s")
    print(f"  加速比: {sequential_time/batch_time:.2f}x")
    
    # 测试 4:多进程
    print("\n[测试 4] 多进程(进程数=4)")
    start = time.time()
    mp_results = multiprocess_generate(model_name, test_prompts, num_workers=4)
    mp_time = time.time() - start
    print(f"  耗时: {mp_time:.2f}s")
    print(f"  加速比: {sequential_time/mp_time:.2f}x")
    
    # 汇总
    print("\n" + "=" * 60)
    print("性能汇总")
    print("=" * 60)
    print(f"顺序调用:  {sequential_time:.2f}s (基准)")
    print(f"异步并发:  {async_time:.2f}s ({sequential_time/async_time:.2f}x 加速)")
    print(f"批处理:   {batch_time:.2f}s ({sequential_time/batch_time:.2f}x 加速)")
    print(f"多进程:    {mp_time:.2f}s ({sequential_time/mp_time:.2f}x 加速)")


# ================== 最佳实践封装 ==================

class OllamaConcurrentClient:
    """
    封装了最佳并发实践的 Ollama 客户端
    """
    
    def __init__(self, model: str, max_concurrent: int = 3):
        self.model = model
        self.max_concurrent = max_concurrent
        self._async_client = None
    
    async def __aenter__(self):
        self._async_client = AsyncClient(host='http://localhost:11434')
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._async_client:
            await self._async_client._client.aclose()
    
    async def generate_batch(self, prompts: List[str]) -> List[str]:
        """异步批量生成"""
        semaphore = asyncio.Semaphore(self.max_concurrent)
        
        async def single_request(prompt: str) -> str:
            async with semaphore:
                response = await self._async_client.generate(
                    model=self.model,
                    prompt=prompt,
                    stream=False
                )
                return response['response']
        
        tasks = [single_request(p) for p in prompts]
        return await asyncio.gather(*tasks)
    
    def generate_batch_sync(self, prompts: List[str]) -> List[str]:
        """同步接口(内部使用异步)"""
        return asyncio.run(self.generate_batch(prompts))


if __name__ == "__main__":
    # 运行性能对比
    benchmark_concurrent_strategies()

关键代码行解析:

asyncio.Semaphore(max_concurrent):信号量控制并发数量。当并发请求过多时,可能导致 Ollama 服务器资源耗尽或超时。

await client.generate(...):异步调用,不阻塞事件循环。多个请求可以同时等待响应,而非顺序等待。

batch_prompt = f"请对以下 {len(batch)} 段文本...":批处理 Prompt 设计。关键是要明确告诉模型输出格式,便于后续解析。

ProcessPoolExecutor(max_workers=num_workers):多进程池。每个进程有独立的 Python 解释器,绕过 GIL 限制。

性能优化建议:

  1. 根据硬件设置并发数:单 GPU 建议 2-5,多 GPU 可更高。
  2. 预热模型:首次请求前运行一次简单请求,触发模型加载。
  3. 监控显存:watch -n 1 nvidia-smi 观察显存占用,调整并发数。
  4. 设置超时:asyncio.wait_for(request, timeout=60) 避免请求无限等待。