泊松过程实战:用Python模拟排队系统(附完整代码)

张开发
2026/4/16 9:05:16 15 分钟阅读

分享文章

泊松过程实战:用Python模拟排队系统(附完整代码)
泊松过程实战用Python模拟排队系统附完整代码在超市收银台前等待结账时你是否好奇过商家如何预测客流高峰客服中心又是如何计算需要多少坐席才能减少客户等待时间这些问题的答案都藏在一个强大的数学工具里——泊松过程。本文将带你用Python从零构建排队系统模拟器通过代码直观理解这一概率模型在真实场景中的应用价值。不同于传统概率论教材中抽象的数学推导我们将采用问题→建模→代码→可视化的工程思维路径。你会学到如何用scipy.stats生成符合真实场景的随机事件流用matplotlib动态展示队列变化并通过对银行、超市等案例的模拟掌握优化服务资源配置的实用技巧。1. 环境准备与基础概念在开始编码前我们需要明确几个关键概念。泊松过程描述的是事件以恒定平均速率随机且独立发生的过程比如客服电话呼入超市顾客到达收银台服务器请求到达这些场景的共同特点是事件发生相互独立单位时间平均发生次数稳定极短时间内几乎不会同时发生多个事件安装所需Python库pip install numpy scipy matplotlib pandas验证安装import scipy.stats as stats print(stats.expon.rvs(scale1/0.5, size5)) # 测试指数分布生成2. 核心算法实现2.1 事件间隔生成泊松过程的关键特性是事件间隔服从指数分布。假设某咖啡厅平均每分钟接待2位顾客(λ2)import numpy as np def generate_arrivals(lambda_param, duration): intervals np.random.exponential(scale1/lambda_param, sizeint(lambda_param*duration*1.2)) arrivals np.cumsum(intervals) return arrivals[arrivals duration] # 生成1小时内顾客到达时间 arrivals generate_arrivals(lambda_param2, duration60) print(f共{len(arrivals)}位顾客到达时间示例{arrivals[:5]})参数说明lambda_param: 单位时间平均事件数duration: 模拟总时长scale1/lambda_param: 指数分布的均值2.2 单服务台队列模拟构建一个完整的单服务台排队系统class SingleServerQueue: def __init__(self, arrival_rate, service_rate): self.arrival_rate arrival_rate # 到达率(人/分钟) self.service_rate service_rate # 服务率(人/分钟) def simulate(self, duration): # 生成到达时间 arrivals generate_arrivals(self.arrival_rate, duration) n_customers len(arrivals) # 生成服务时间 service_times np.random.exponential(scale1/self.service_rate, sizen_customers) # 计算等待时间 departures np.zeros(n_customers) wait_times np.zeros(n_customers) departures[0] arrivals[0] service_times[0] for i in range(1, n_customers): if arrivals[i] departures[i-1]: wait_times[i] departures[i-1] - arrivals[i] departures[i] arrivals[i] wait_times[i] service_times[i] return { arrivals: arrivals, service_times: service_times, wait_times: wait_times, departures: departures }使用示例queue SingleServerQueue(arrival_rate3, service_rate4) results queue.simulate(duration120)3. 可视化与分析3.1 时间线可视化import matplotlib.pyplot as plt def plot_timeline(results): fig, ax plt.subplots(figsize(10, 6)) for i, (arrival, service, wait, departure) in enumerate( zip(results[arrivals], results[service_times], results[wait_times], results[departures])): ax.plot([arrival, arrival wait], [i, i], y-, linewidth2) # 等待阶段 ax.plot([arrival wait, departure], [i, i], g-, linewidth2) # 服务阶段 ax.plot(arrival, i, bo) # 到达时刻 ax.set_xlabel(时间 (分钟)) ax.set_ylabel(顾客序号) ax.set_title(排队过程时间线) plt.grid(True) plt.show() plot_timeline(results)3.2 关键指标计算def calculate_metrics(results): metrics {} wait_times results[wait_times] metrics[avg_wait] np.mean(wait_times) metrics[max_wait] np.max(wait_times) metrics[utilization] np.sum(results[service_times]) / results[departures][-1] # 等待超过5分钟的顾客比例 metrics[pct_long_waits] np.mean(wait_times 5) * 100 return metrics metrics calculate_metrics(results) print(f平均等待时间: {metrics[avg_wait]:.2f}分钟) print(f系统利用率: {metrics[utilization]:.1%})4. 多服务台扩展与优化4.1 多柜台模拟实现扩展为多服务台系统如银行有多个窗口class MultiServerQueue: def __init__(self, arrival_rate, service_rate, n_servers): self.arrival_rate arrival_rate self.service_rate service_rate self.n_servers n_servers def simulate(self, duration): arrivals generate_arrivals(self.arrival_rate, duration) n_customers len(arrivals) service_times np.random.exponential(scale1/self.service_rate, sizen_customers) # 初始化服务台空闲时间 servers_free np.zeros(self.n_servers) wait_times np.zeros(n_customers) departures np.zeros(n_customers) for i in range(n_customers): earliest_free np.min(servers_free) if arrivals[i] earliest_free: wait_times[i] earliest_free - arrivals[i] else: wait_times[i] 0 # 分配最早空闲的服务台 server_idx np.argmin(servers_free) departures[i] arrivals[i] wait_times[i] service_times[i] servers_free[server_idx] departures[i] return { arrivals: arrivals, service_times: service_times, wait_times: wait_times, departures: departures }4.2 服务台数量优化通过模拟寻找最优服务台数量def optimize_servers(arrival_rate, service_rate, max_servers10, duration480): results [] for n in range(1, max_servers1): queue MultiServerQueue(arrival_rate, service_rate, n) sim_results queue.simulate(duration) metrics calculate_metrics(sim_results) metrics[n_servers] n results.append(metrics) return pd.DataFrame(results) # 示例呼叫中心每小时接到50通电话平均处理时间5分钟 df_results optimize_servers(arrival_rate50/60, service_rate1/5) print(df_results[[n_servers, avg_wait, utilization]])典型输出n_serversavg_waitutilization145.20.98212.70.8533.10.7240.80.625. 真实场景应用案例5.1 超市收银台配置假设某超市高峰时段顾客到达率120人/小时平均结账时间2分钟/人可开放收银台4-8个模拟代码supermarket_results optimize_servers( arrival_rate120/60, service_rate1/2, max_servers8, duration120 # 模拟2小时高峰 ) plt.figure(figsize(10,4)) plt.plot(supermarket_results[n_servers], supermarket_results[avg_wait], o-) plt.xlabel(收银台数量) plt.ylabel(平均等待时间(分钟)) plt.title(不同收银台配置下的顾客等待时间) plt.grid(True) plt.show()5.2 急诊室分诊系统更复杂的优先级队列模拟class PriorityQueue: def __init__(self, arrival_rates, service_rates, n_servers): arrival_rates: 各优先级到达率列表 service_rates: 各优先级服务率列表 self.arrival_rates arrival_rates self.service_rates service_rates self.n_servers n_servers def simulate(self, duration): # 生成各优先级顾客到达时间 all_arrivals [] priorities [] for prio, rate in enumerate(self.arrival_rates): arrivals generate_arrivals(rate, duration) all_arrivals.extend(arrivals) priorities.extend([prio]*len(arrivals)) # 按到达时间排序 idx_sorted np.argsort(all_arrivals) arrivals_sorted np.array(all_arrivals)[idx_sorted] priorities_sorted np.array(priorities)[idx_sorted] # 生成服务时间 service_times np.zeros_like(arrivals_sorted) for prio in set(priorities_sorted): mask priorities_sorted prio service_times[mask] np.random.exponential( scale1/self.service_rates[prio], sizenp.sum(mask)) # 模拟排队过程 servers_free np.zeros(self.n_servers) wait_times np.zeros(len(arrivals_sorted)) departures np.zeros(len(arrivals_sorted)) for i in range(len(arrivals_sorted)): earliest_free np.min(servers_free) if arrivals_sorted[i] earliest_free: wait_times[i] earliest_free - arrivals_sorted[i] # 总是分配给最早空闲的服务台 server_idx np.argmin(servers_free) departures[i] arrivals_sorted[i] wait_times[i] service_times[i] servers_free[server_idx] departures[i] return { arrivals: arrivals_sorted, service_times: service_times, wait_times: wait_times, departures: departures, priorities: priorities_sorted }使用示例# 急诊室三类患者危急(0)、紧急(1)、普通(2) er_queue PriorityQueue( arrival_rates[5/60, 10/60, 15/60], # 人/分钟 service_rates[1/20, 1/15, 1/10], # 人/分钟 n_servers5 ) er_results er_queue.simulate(duration8*60) # 8小时 # 按优先级分析等待时间 for prio in [0, 1, 2]: mask er_results[priorities] prio avg_wait np.mean(er_results[wait_times][mask]) print(f优先级{prio}平均等待时间: {avg_wait:.1f}分钟)

更多文章