Post

加权随机选择算法

加权随机选择算法

别名算法

go 版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main

import (
	"errors"
	"math/rand"
	"time"
)

type WeightedRandom struct {
	values  []interface{}
	weights []float64
	aliases []aliasEntry
	count   int
}

type aliasEntry struct {
	prob  float64
	alias int
}

// NewWeightedRandom 创建一个新的加权随机选择器
func NewWeightedRandom(values []interface{}, weights []float64) (*WeightedRandom, error) {
	if len(values) != len(weights) {
		return nil, errors.New("values and weights must have the same length")
	}

	count := len(weights)
	if count == 0 {
		return nil, errors.New("values and weights cannot be empty")
	}

	// 计算总权重和平均权重
	total := 0.0
	for _, w := range weights {
		total += w
	}
	avg := total / float64(count)

	// 初始化别名表
	aliases := make([]aliasEntry, count)
	for i := range aliases {
		aliases[i] = aliasEntry{prob: 1.0, alias: -1}
	}

	// 分离小权重和大权重
	small := make([]int, 0, count)
	large := make([]int, 0, count)

	for i, w := range weights {
		normalized := w / avg
		if normalized < 1.0 {
			small = append(small, i)
		} else {
			large = append(large, i)
		}
	}

	// 处理小权重和大权重
	for len(small) > 0 && len(large) > 0 {
		smallIdx := small[len(small)-1]
		small = small[:len(small)-1]
		smallProb := weights[smallIdx] / avg

		largeIdx := large[len(large)-1]
		large = large[:len(large)-1]
		largeProb := weights[largeIdx] / avg

		// 设置别名表
		aliases[smallIdx] = aliasEntry{
			prob:  smallProb,
			alias: largeIdx,
		}

		// 调整大权重的剩余量
		remaining := largeProb - (1.0 - smallProb)

		if remaining < 1.0 {
			small = append(small, largeIdx)
			weights[largeIdx] = remaining * avg
		} else {
			large = append(large, largeIdx)
			weights[largeIdx] = remaining * avg
		}
	}

	// 处理剩余的小权重或大权重
	for _, idx := range small {
		aliases[idx] = aliasEntry{prob: 1.0, alias: -1}
	}

	for _, idx := range large {
		aliases[idx] = aliasEntry{prob: 1.0, alias: -1}
	}

	// 初始化随机种子
	rand.Seed(time.Now().UnixNano())

	return &WeightedRandom{
		values:  values,
		weights: weights,
		aliases: aliases,
		count:   count,
	}, nil
}

// Next 返回一个随机选择的值及其权重
func (wr *WeightedRandom) Next() (interface{}, float64) {
	// 随机选择一个桶
	n := rand.Float64() * float64(wr.count)
	i := int(n)
	entry := wr.aliases[i]

	// 决定返回原始索引还是别名索引
	var selectedIdx int
	if (n-float64(i)) > entry.prob && entry.alias != -1 {
		selectedIdx = entry.alias
	} else {
		selectedIdx = i
	}

	return wr.values[selectedIdx], wr.weights[selectedIdx]
}

python 版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import random
from typing import List, Tuple, Any, Callable

def prepare_weighted_random(values: List[Any], weights: List[float]) -> Callable[[], Tuple[Any, float]]:
    """
    准备加权随机选择函数(别名方法实现)
    
    参数:
        values: 要从中选择的值列表
        weights: 对应的权重列表
        
    返回:
        一个函数,每次调用返回一个随机选择的值及其权重
    """
    assert len(values) == len(weights), "values和weights长度必须相同"
    
    count = len(weights)
    total = sum(weights)
    avg = total / count  # 平均权重
    
    # 初始化别名表,每个元素是(概率, 别名索引)
    aliases = [(1.0, None) for _ in range(count)]
    
    # 找出小于和大于平均权重的索引
    small = []
    large = []
    
    for i, w in enumerate(weights):
        normalized = w / avg
        if normalized < 1.0:
            small.append((i, normalized))
        else:
            large.append((i, normalized))
    
    # 处理小权重和大权重
    while small and large:
        small_idx, small_prob = small.pop()
        large_idx, large_prob = large.pop()
        
        # 将小权重的一部分与大权重结合
        aliases[small_idx] = (small_prob, large_idx)
        
        # 调整大权重的剩余量
        remaining = large_prob - (1.0 - small_prob)
        
        if remaining < 1.0:
            small.append((large_idx, remaining))
        else:
            large.append((large_idx, remaining))
    
    # 处理剩余的小权重或大权重
    while small:
        idx, prob = small.pop()
        aliases[idx] = (1.0, None)  # 概率为1,无别名
    
    while large:
        idx, prob = large.pop()
        aliases[idx] = (1.0, None)  # 概率为1,无别名
    
    def weighted_random():
        """
        加权随机选择函数
        """
        # 随机选择一个桶
        n = random.random() * count
        i = int(n)
        prob, alias = aliases[i]
        
        # 决定返回原始索引还是别名索引
        if (n - i) > prob and alias is not None:
            selected_idx = alias
        else:
            selected_idx = i
        
        return values[selected_idx], weights[selected_idx]
    
    return weighted_random
This post is licensed under CC BY 4.0 by the author.