本次使用的是多台8卡1080Ti服务器进行DeepSpeed多机多卡实验。
Supervised finetuning#
首先在主节点克隆deepspeed-chat仓库。
使用的主要环境:
1
2
3
4
5
6
7
8
9
|
pip install torch==1.13.0
pip install datasets
pip install sentencepiece
pip install protobuf==3.20.3
pip install accelerate
pip install deepspeed==0.10.0
pip install transformers==4.44.2
pip install tensorboard
pip install numpy==1.26.4
|
deepspeed安装需要有nvcc,开始这些1080Ti服务器没有nvcc,所以先装了这个:
1
2
|
sudo apt update
sudo apt install nvidia-cuda-toolkit
|
之后先跑通单节点,我用的是step1_supervised_finetuning/training_scripts/opt/single_node/run_1.3b.sh
,因为当时考虑1080Ti显存较小,不过后来发现原仓库里的bash脚本都差不多,就是改了模型路径。
跑通单节点也花了不少时间,最开始是模型和数据集的问题,因为服务器本地连接不到hf,所以下载了opt-1.3b模型到主节点,数据集部分也无法访问hf,是从hf上下载了synthetic-instruct-gptj-pairwise
数据集,两个文件保存在主节点:
1
2
3
4
|
datasets
└── synthetic-instruct-gptj-pairwise
├── dataset_infos.json
└── train-00000-of-00001-1e5d57b93c448e7a.parquet
|
在dschat/utils/data/raw_datasets.py
的数据集类PromptRawDataset
上也做了对应修改:
1
2
3
4
5
6
7
8
9
|
class PromptRawDataset(object):
def __init__(self, output_path, seed, local_rank, dataset_name):
self.output_path = output_path
self.seed = seed
self.local_rank = local_rank
'''原始数据的读取,这里根据自己数据集作相应修改'''
self.raw_datasets = load_dataset('parquet', data_files=dataset_name)
...
|
到这里,数据集模型以及环境都差不多了,在单节点上启动训练脚本,发现optimizer有报错,原因是原训练主函数使用的是FusedAdam
,可能是g++环境匹配存在问题,这个最终没解决就没管了,直接把optimizer换成AdamW
就跑通了。查了一下FusedAdam
在需要大量计算资源的场景下有一定优势。
单节点跑通之后就开始多节点训练,多节点训练首先每个节点需要安装pdsh工具:
其次多节点需要在deepspeed启动命令添加--hostfile
参数以及配置NCCL参数,hostfile文件形式如下:
1
2
3
|
1.2.3.4 slots=8
1.2.3.5 slots=8
...
|
第一列是节点ip,第二列是该节点的gpu数量。NCCL参数配置如下:
1
|
OPTIONS_NCCL="NCCL_DEBUG=warn NCCL_SOCKET_IFNAME=enp59s0f0 NCCL_IB_GID_INDEX=3 NCCL_IB_HCA=mlx5_2:1,mlx5_2:1 NCCL_IB_SL=3 NCCL_CHECKS_DISABLE=1 NCCL_P2P_DISABLE=0 NCCL_LL_THRESHOLD=16384 NCCL_IB_CUDA_SUPPORT=1"
|
其中NCCL_SOCKET_IFNAME是服务器上可用的网络接口,可以通过ip addr show
命令查看。
然后对于每个子节点,都要配置相同的环境(见上)以及相同的代码路径结构,模型文件每个节点都要保存(这里我直接把deepspeed目录打包scp到各个节点了),数据集文件主需要存在主节点上即可。这里卡的比较久的地方是子节点训练环境的位置问题,起初我把训练环境都装在每个节点的一个conda虚拟环境里,主节点进入虚拟环境启动训练脚本,但是当通信到子节点的时候报错提示找不到相关环境:
1
|
/usr/bin/python3: Error while finding module specification for 'deepspeed.launcher.launch' (ModuleNotFoundError: No module named 'deepspeed')
|
问题在于这里通信到子节点不会访问对应conda虚拟环境,后来我在子节点conda base下装训练环境也还是不行。最后解决方法是得在linux默认环境下(不带base)把训练依赖装好,这下马上就跑通了。应该是有在conda下也能运行的方法,后续了解了再补充。
最终的训练脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
#!/bin/bash
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
OUTPUT=$1
ZERO_STAGE=$2
if [ "$OUTPUT" == "" ]; then
OUTPUT=./output
fi
if [ "$ZERO_STAGE" == "" ]; then
ZERO_STAGE=3
fi
mkdir -p $OUTPUT
num_nodes=3
num_gpus=8
OPTIONS_NCCL="NCCL_DEBUG=warn NCCL_SOCKET_IFNAME=enp59s0f0 NCCL_IB_GID_INDEX=3 NCCL_IB_HCA=mlx5_2:1,mlx5_2:1 NCCL_IB_SL=3 NCCL_CHECKS_DISABLE=1 NCCL_P2P_DISABLE=0 NCCL_LL_THRESHOLD=16384 NCCL_IB_CUDA_SUPPORT=1"
model_name_or_path=/home/iaiustc/sft-rlhf/models/opt-1.3
data_output_path=/home/iaiustc/sft-rlhf/output/cache_dir/ds_chat
batch_size=2
max_seq_len=512
learning_rate=9.65e-6
weight_decay=0.
num_train_epochs=1
gradient_accumulation_steps=4
lr_scheduler_type=cosine
num_warmup_steps=0
seed=1234
eval_interval=100
save_interval=100
print_interval=10
ARGS=" \
--data_path /home/iaiustc/sft-rlhf/datasets/synthetic-instruct-gptj-pairwise/train-00000-of-00001-1e5d57b93c448e7a.parquet \
--data_output_path ${data_output_path} \
--data_split 2,4,4 \
--model_name_or_path ${model_name_or_path} \
--per_device_train_batch_size ${batch_size} \
--per_device_eval_batch_size ${batch_size} \
--max_seq_len ${max_seq_len} \
--learning_rate ${learning_rate} \
--weight_decay ${weight_decay} \
--num_train_epochs ${num_train_epochs} \
--gradient_accumulation_steps ${gradient_accumulation_steps} \
--lr_scheduler_type ${lr_scheduler_type} \
--num_warmup_steps ${num_warmup_steps} \
--seed ${seed} \
--zero_stage $ZERO_STAGE \
--deepspeed \
--enable_tensorboard \
--tensorboard_path $OUTPUT \
--output_dir $OUTPUT \
--eval_interval ${eval_interval}
--save_interval ${save_interval}
--print_interval ${print_interval}
"
if [[ ${num_nodes} -gt 1 ]]; then
# create hostfile if num_nodes > 1
python create_hostfile.py
hostfile_arg="--hostfile ./output/hostfile"
else
hostfile_arg=""
fi
deepspeed --num_nodes ${num_nodes} --num_gpus ${num_gpus} \
${hostfile_arg} --master_port 12346 \
main.py "$@" ${ARGS} 2>&1 | tee "${OUTPUT}/training2.log"
|
至此关于使用deepspeed进行多机多卡做sft训练就完成了,后续关于reward model以及rlhf的训练应该差不多,等实现完后更新。
Reward Model#
Reward Model本质上就是base model添加一个projction_head头得到的,projction_head头是把base model最后一层输出的hidden_states投影到1维上。因此在多机多卡的训练执行所需基本调整和Supervised Finetuning一样,这里主要记录一下RewardModel类的几个主要功能函数实现细节。
1. forward函数#
DeepSpeed-Chat/dschat/utils/model/reward_model.py
forward函数用于RM训练计算训练损失以及训练chosen数据和rejected数据的平均得分,也是一种训练参考指标。RM训练损失函数
$$
\mathcal L_R=-\mathbb E_{(x,y_w, y_l)\sim\mathcal D}[log\ \sigma(r_\phi(x, y_w)-r_\phi(x,y_l))]
$$
具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
class RewardModel(nn.Model):
def __init__(self, ...):
...
...
def forward(self, input_ids=None, ...):
transformer_outputs = self.rwtransformer(...)
# 输出最后一层特征
# hidden_states.shape (bs*2, max_seq_len, hidden_size),数据是前一半为chosen部分,后一半为rejected部分
hidden_states = transformer_outputs[0]
# rewards.shape: (bs*2, max_seq_len)
rewards = self.v_head(hidden_states).squeeze(-1)
bs = input_ids.shape[0] // 2
chosen_ids = input_ids[:bs]
rejected_ids = input_ids[bs:]
chosen_rewards = rewards[:bs]
rejected_rewards = rewards[bs:]
chosen_mean_scores = []
rejected_mean_scores = []
loss = 0.
for i in range(bs):
# (max_seq_len, )
chosen_id = chosen_ids[i]
rejected_id = rejected_ids[i]
# (bs, max_seq_len)
chosen_reward = chosen_rewards[i]
rejected_reward = rejected_rewards[i]
# 得到chosen_id张量中元素为0的坐标,例如a = torch.tensor([1,2,3,0,0,0]),(a == 0).nonzero()为torch.tensor([[3],[4],[5]]);如果a = torch.tensor([[1,2,3,0,0,0]]), (a == 0).nonzero()为torch.tensor([[0,3],[0,4],[0,5]])
c_inds = (chosen_id == self.PAD_ID).nonzero()
# c_ind为chosen_sentence的answer后的第一个pad_token的index,例如chosen_id=torch.tensor([1,2,3,0,0,0]) 那么c_ind=3
# num_padding_at_beginning这个参数主的出现主要由于opt系列模型在input前有一个固定数量(等于1)的padding token (</s>,和bert有点像),对于其他autoregression模型没有这种。源码用了opt模型因此这里num_padding_at_beginning设置为1
c_ind = c_inds[self.num_padding_at_beginning].item() if len(c_inds) > self.num_padding_at_beginning else seq_len
check_divergence = (chosen_id != rejected_id).nonzero()
# 如果当前chosen_sentence和rejected_sentence完全相同,这对数据只计算末位的损失(?)
if len(check_divergence) == 0:
end_ind = rejected_reward.size(-1)
divergence_ind = end_ind - 1
r_ind = c_ind
else:
r_inds = (rejected_id == self.PAD_ID).nonzero()
r_ind = r_inds[self.num_padding_at_beginning].item() if len(r_inds) > self.num_padding_at_beginning else seq_len
# end_ind 为c_ind,r_ind两者大值,即计算损失的有效末尾index
end_ind = max(c_ind, r_ind)
# divergence_ind为chosen_sentence和reject_sentence两者answer的第一个token的index,即计算损失的有效起始index
divergence_ind = check_divergence[0]
assert divergence_ind > 0
c_truncated_reward = chosen_reward[divergence_ind:end_ind]
r_truncated_reward = rejected_reward[divergence_ind:end_ind]
# 这两个mean_scores只保留有效answer部分末尾的reward
chosen_mean_scores.append(chosen_reward[c_ind - 1])
rejected_mean_scores.append(rejected_reward[r_ind - 1])
loss += -torch.nn.functional.logsigmoid(c_truncated_reward - r_truncated_reward).mean()
loss = loss / bs
# (bs, )
chosen_mean_scores = torch.stack(chosen_mean_scores)
rejected_mean_scores = torch.stack(rejected_mean_scores)
return {
"loss": loss,
"chosen_mean_scores": chosen_mean_scores,
"rejected_mean_scores": rejected_mean_scores,
}
|
2. forward_value函数#
DeepSpeed-Chat/dschat/utils/model/reward_model.py
forward_value函数主要用于rlhf阶段reward model和critic model前向计算reward和value,因此这里的input_ids输入不再是chosen和rejected一半一半,而是基于prompt生成的sequence。具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class RewardModel(nn.Model):
def __init__(self, ...):
...
...
def forward_value(self, input_ids, ...):
transformer_outputs = self.rwtransformer(input_ids, ...)
hidden_states = transformer_outputs[0]
# (bs, max_seq_len)
values = self.v_head(hidden_states).squeeze(-1)
if return_value_only:
return values
else:
assert prompt_length > 1, "prompt_length must be greater than 1 to help select the end score"
bs = values.size(0)
seq_len = input_ids.shape[1]
chosen_end_scores = []
for i in range(bs):
input_id = input_ids[i]
value = values[i]
# c_ind和forward中含义一样,也是有效answer后的第一个pad_token的index,这里之所以先去除prompt_length部分的id进行计算最后再加上prompt_length,主要因为在rlhf阶段,prompt数据集也是按batch从dataloader中获取,所以prompt中也存在padding(padding + true_prompt),所以这里是为了去除prompt中padding的干扰
c_inds = (input_id[prompt_length:] == self.PAD_ID).nonzero()
c_ind = c_inds[0].item() + prompt_length if len(c_inds) > 0 else seq_len
chosen_end_scores.append(value[c_ind - 1])
return {
"values": values, # (bs, max_seq_len)
"chosen_end_scores": torch.stack(chosen_end_scores), # (bs,)
}
|
RLHF#
RLHF阶段在代码跑通上与Supervised Finetuning和Reward Model训练的设置一致,前面跑通了,这一阶段基本改一下输入参数就可以直接跑,因此这里主要记录deepspeed关于rlhf部分的实现细节。主要函数均在在DeepSpeed-Chat/dschat/rlhf/ppo_trainer.py脚本的DeepSpeedPPOTrainer类中。
1. _generate_sequence函数#
_generate_sequence函数输入一个batch(bs)的prompts数据,生成一个seq_bs的sequence,这里面主要对生成answer的长度做了过滤,生成的sequence的answer部分长度小于等于1的数据会被扔掉,所以输出sequence的维度变为seq_bs。
2. generate_experience函数#
generate_experience函数用于生成经验数据,输入是一个batch(bs)的prompts数据,输出包括reference model的logprobs,actor model的logprobs,reward model的rewards,critic model的value,以及sequence的input_ids和attention_mask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
def generate_experience(self, prompts, mask):
# 所有4个模型进入eval模式
self.eval()
seq = self._generate_sequence(prompts, mask, step)
if seq is None:
assert self.last_generated_experience is not None, f'Invalid generated experience at step={step}'
prompts = self.last_generated_experience['prompts']
seq = self.last_generated_experience['seq']
else:
self.last_generated_experience = {'prompts': prompts, 'seq': seq}
# actor model和critic model进入train模式
self.train()
pad_token_id = self.tokenizer.pad_token_id
attention_mask = seq.not_equal(pad_token_id).long()
with torch.no_grad():
output = self.actor_model(seq, attention_mask=attention_mask)
output_ref = self.ref_model(seq, attention_mask=attention_mask)
reward_score = self.reward_model.forward_value(seq, attention_mask, prompt_length=self.prompt_length)['chosen_end_scores'].detach()
values = self.critic_model.forward_value(seq, attention_mask, return_value_only=True).detach()[:, :-1]
logits = output.logits
logits_ref = output_ref.logits
return {
'prompt': prompts, # (bs, max_prompt_len)
'logprobs': gather_log_probs(logits[:, :-1, :], seq[:, 1:]), # (seq_bs, max_seq_len - 1)
'ref_logprobs': gather_log_probs(logits_ref[:, :-1, :], seq[:, 1:]), # (seq_bs, max_seq_len - 1)
'value': values, # (seq_bs, max_seq_len - 1)
'rewards': reward_score, # (seq_bs, )
'input_ids': seq, # (seq_bs, max_seq_len - 1)
'attention_mask': attention_mask # (seq_bs, max_seq_len)
}
|
3. compute_rewards函数#
首先介绍rlhf中的reward计算公式
$$
r_{KL}=r(x,y)-\beta log\frac{\pi^{RL}_{old}(y|x)}{\pi^{SFT}(y|x)}
$$
具体代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def compute_rewards(self, prompts, log_probs, ref_log_probs, reward_score, action_mask):
kl_divergence_estimate = -self.kl_ctl * (log_probs - ref_log_probs)
rewards = kl_divergence_estimate # (bs, max_seq_len - 1)
start = prompts.shape[1] - 1
# ends为batch中各个数据的最后一个有效token的index,是一个数组
# 这里分开prompt部分单独计算也是由于prompts中存在padding
ends = start + action_mask[:, start:].sum(1) + 1
# RM得到的奖励值限定在一定范围
reward_clip = torch.clamp(reward_score, -self.clip_reward_value, self.clip_reward_value)
batch_size = log_probs.shape[0]
for j in range(batch_size):
rewards[j, start:end[j]][-1] += reward_clip[j]
return rewards # (bs, max_seq_len - 1)
|
4. actor_loss_fn函数#
在一个ppo_batch中,actor损失计算公式
$$
pg\_loss=E_{\tau\sim\pi_{old}^{RL}}E_{(s_t,a_t)\sim\tau}[max(-\hat A_t\cdot\frac{p_{new}^{RL}(a_t|s_t)}{p_{old}^{RL}(a_t|s_t)},-\hat A_t\cdot clip(\frac{p_{new}^{RL}(a_t|s_t)}{p_{old}^{RL}(a_t|s_t)},1-\epsilon,1+\epsilon))]
$$
其中$\tau$指的仅是“answer”部分内容,不包括“prompt”部分。
1
2
3
4
5
6
7
8
9
|
def actor_loss_fn(self, logprobs, old_logprobs, advantages, mask):
# policy gradient loss
# 重要性采样权重计算 ratio = exp(log(new) - log(old))
log_ratio = (logprobs - old_logprobs) * mask
ratio = torch.exp(log_ratio)
pg_loss1 = -advantages * ratio
pg_loss2 = -advantages * torch.clamp(ratio, 1.0 - self.cliprange, 1.0 + self.cliprange)
pg_loss = torch.sum(torch.max(pg_loss1, pg_loss2) * mask) / mask.sum()
return pg_loss
|
5. critic_loss_fn函数#
在一个ppo_batch中,critic的损失计算公式:1)裁剪新价值估计$V_{new}$,使其不至于太偏离采集经验时的旧价值估计,使得经验回放仍能有效:
$$
V_{clip}=clip(V_{new}, V_{old}-\phi,V_{old}+\phi)
$$
2)critic拟合回报R:
$$
vf\_loss=\frac{1}{2}\cdot E_{\tau\sim\pi_{old}^{RL}}E_{s_t\sim\tau}[max((V_{new}(s_t)-R_t)^2, (V_{clip}(s_t)-R_t)^2)]
$$
其中$\tau$指的仅是“answer”部分内容,不包括“prompt”部分。
1
2
3
4
5
6
7
|
def critic_loss_fn(self, values, old_values, returns, mask):
# value loss
values_clipped = torch.clamp(values, old_values - self.cliprange_value, old_values + self.cliprange_value)
vf_loss1 = (values - returns) ** 2
vf_loss2 = (values_clipped - returns) ** 2
vf_loss = 0.5 * torch.sum(torch.max(vf_loss1, vf_loss2) * mask) / mask.sum()
return vf_loss
|
6. get_advantages_and_returns函数#
优势advantages的计算,包括本框架在内的多数框架的advantages实现并非纯粹TD-error,而是在TD-error基础上结合MC方法,即GAE(广义优势估计)。具体来说,对于全长尾T的轨迹来说,其某个时间步t的优势为($\lambda=1$时,advantage完全使用MC方法;$\lambda=0$时,advantage完全使用TD-error方法):
$$
\hat A_t=\delta_t+(\gamma\lambda)\delta_{t+1}+(\gamma\lambda)^2\delta_{t+2}+\dots+(\gamma\lambda)^{T-t+1}\delta_{T-1}\\
where\ \delta_t=r_{KL,t}+\gamma\cdot V_{old}(s_{t+1})-V_{old}(s_t)
$$
回报returns的计算,returns就是奖励reward的累计,对于全长为T的轨迹来说,其到达某个时间步$t$时的回报为:
$$
R_t=\hat A_t+V_t
$$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def get_advantages_and_returns(self, values, rewards, start):
lastgaelam = 0
advantages_reversed = []
length = rewards.size(-1)
# 反向遍历各个时间步的优势advantage
for t in reversed(range(start, length)):
# 获取下个时间步的价值估计V_{old}(s_{t+1})
nextvalues = values[:, t + 1] if t < length - 1 else 0.0
# 计算单步TD-error
delta = rewards[:, t] + self.gamma * nextvalues - values[:, t]
# 累计优势
lastgaelam = delta + self.gamma * self.lam * lastgaelam
# 存储各个时间步的优势
advantages_reversed.append(lastgaelam)
# 对逆序的优势列表进行正序处理,得到正常时间步排列的优势
advantages = torch.stack(advantages_reversed[::-1], dim=1) # (seq_bs, max_seq_len - 1 - start)
# return_t = adv_t + v(s_t)
# 通过优势计算得到回报
returns = advantages + values[:, start:] # (bs, max_seq_len - 1 - start)
return advantages.detach(), returns
|
7. train_rlhf函数#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
def train_rlhf(self, inputs):
# inputs为一个ppo_batch的generate_experience函数返回值
prompts = inputs['prompts']
log_probs = inputs['logprobs']
ref_log_probs = inputs['ref_logprobs']
reward_score = inputs['rewards']
values = inputs['value']
attention_mask = inputs['attention_mask']
seq = inputs['input_ids']
start = prompts.size()[-1] - 1 # max_prompt_len - 1
action_mask = attention_mask[:, 1:] # (ppo_bs, max_seq_len - 1)
# 利用经验池中旧的logprobs, ref_logprobs以及reward_score计算KL-reward,并利用KL-reward和旧的values计算advantages和returns
old_values = values # (ppo_bs, max_seq_len - 1)
with torch.no_grad():
# old_rewards (ppo_bs, max_seq_len - 1)
old_rewards = self.compute_rewards(prompts, log_probs, ref_log_probs, reward_score, action_mask)
ends = start + action_mask[:, start:].sum(1) + 1
# 将reward和value中padding部分的值置零不然advantage和return计算会出错
for i in range(old_rewards.shape[0]):
old_rewards[i, ends[i]:] = 0
old_values[i, end[i]:] = 0
# advantages (ppo_bs, max_seq_len - max_prompt_len)
# returns (ppo_bs, max_seq_len - max_prompt_len)
advantages, returns = self.get_advantages_and_returns(old_values, old_rewards, start)
batch = {'input_ids': seq, 'attention_mask': attention_mask}
# 利用当前最新actor model计算最新logprob,计算actor_loss并更新actor model参数
actor_prob = self.actor_model(**batch, use_cache=False).logits
actor_log_prob = gather_log_probs(actor_prob[:, :-1, :], seq[:, 1:])
actor_loss = self.actor_loss_fn(actor_log_prob[:, start:], log_probs[:, start:], advantages, action_mask[:, start:])
self.actor_model.backward(actor_loss)
self.actor_model.step()
# 利用当前最新critic model计算最新value,计算critic_loss并更新critic model参数,完成一个ppo batch数据的训练
value = self.critic_model.forward_value(**batch, return_value_only=True, use_cache=False)[:, :-1]
critic_loss = self.critic_loss_fn(value[:, start:], old_values[:, start:], returns, action_mask[:, start:])
self.critic_model.backward(critic_loss)
self.critic_model.step()
return actor_loss, critic_loss
|
PPO训练数据管理-MiniDataset#
/DeepSpeed-Chat/dschat/utils/data/data_utils.py
MiniDataset是一个进一步划分ppo训练时数据的一个类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
class MiniDataset:
def __init__(self, max_size, small_batch_size):
# max_size为进行划分ppo训练数据时的normal batch容量,比如max_size=2,batch=4则意味着当dataset中含有两个batch(8条数据)时,开始划分ppo batch。
# small_batch_size为ppo训练时的batch大小,即ppo_batch
self.dataset = []
self.max_size = max_size
self.small_batch_size = small_batch_size
def seperate(self):
# 当self.dataset长度达到max_size时,开始划分ppo_batch
# 假设max_size=2, small_batch_size=3, normal batch_size=4
# 划分前self.dataset=[[d0,d1,d2,d3], [d4,d5,d6,d7]],划分后的small_dataset应该为[[d0,d1,d2], [d3], [d4,d5,d5], [d7]]
small_dataset = []
for large_batch in self.dataset:
if type(large_batch) == list or type(large_batch) == tuple:
# large_size即normal batch size
large_size = len(large_batch[0])
elif type(large_batch) == dict:
large_size = len(large_batch[list(large_batch.keys())[0]])
else:
large_size = len(large_batch)
for i in range(0, large_size, self.small_batch_size):
if type(large_batch) == list or type(large_batch) == tuple:
small_dataset.append([x[i:i + self.small_batch_size] for x in large_batch])
elif type(large_batch) == dict:
small_dataset.append({k: v[i:i + self.small_batch_size] for k, v in large_batch.items()})
else:
small_dataset.append(large_batch[i: i + self.small_batch_size])
self.free()
return small_dataset
def add(self, data):
if len(self.dataset) < self.max_size:
self.dataset.append(data)
if len(self.dataset) == self.max_size:
return self.seperate()
else:
return None
else:
raise ValueError('xx')
def free(self):
self.dataset = []
|
main.py中训练主循环#
DeepSpeed-Chat/training/step3_rlhf_finetuning/main.py
不考虑unsupervised数据,记录rlhf训练主函数循环流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
exp_mini_dataset = MiniDataset(args.generation_batches, args.per_device_training_batch_size)
for epoch in range(args.num_train_epochs):
for step, batch_prompt in enumerate(prompt_train_dataloader):
batch_prompt = to_device(batch_prompt, device)
# 计算当前batch prompt的经验数据
out = trainer.generate_experience(batch_prompt['prompt'], batch_prompt['prompt_att_mask'], step)
# 添加当前批经验数据,达到args.generation_batches时划分成ppo_batch数据进行训练,否则继续添加
exp_dataset = exp_mini_dataset.add(out)
if exp_dataset is not None:
inner_iter = 0
actor_loss_sum, critic_loss_sum = 0, 0
average_reward = 0
if ppo_ep in range(args.ppo_epochs):
for i, exp_data in enumerate(exp_dataset):
actor_loss, critic_loss = trainer.train_rlhf(exp_data)
actor_loss_sum += actor_loss.item()
critic_loss_sum += critic_loss.item()
average_reward += exp_data['rewards'].mean()
inner_iter += 1
random.shuffle(exp_dataset)
print_rank_0(f"{epoch} | {step} | {ppo_ep+1} | {actor_loss_sum / inner_iter} | {critic_loss_sum / inner_iter}")
|
总结就是每args.generation_batches个batch数据使用当前{actor, ref, critic, reward}模型生成一批经验数据,这批经验数据构建ppo_batch训练数据开始进行args.ppo_epochs轮训练,期间每个ppo_epoch的每个inner_iter对{actor, critic}模型做一步参数更新,每次完成当前经验数据全部ppo_epochs训练后打印平均{actor_loss, critic_loss, average_reward}。直到训练完prompt_dataloader中的prompt数据结束一个大epoch,基于此循环args.num_train_epochs次。
References#
[1] InstructGPT高效实践——【DeepSpeed-Chat】源码详解(2/3):Supervised Finetuning、Reward Model Finetuning
[2] InstructGPT高效实践——【DeepSpeed-Chat】源码详解(3/3):RLHF Finetuning