掘金 人工智能 20小时前
【代码走读】DETR-Facebook AI-ECCV 2020
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入解析了DETR模型的核心代码逻辑,详细介绍了如何端到端地解决目标检测问题。从模型构建、backbone(如ResNet50)与位置编码(Sine)的结合,到Transformer编码器和解码器的结构与数据流转,再到最后的分类和边界框预测,以及匈牙利算法在目标与预测匹配中的应用,都进行了详尽的阐述。文章还重点讲解了分类损失和边界框损失的计算方式,为理解DETR的端到端检测机制提供了清晰的指导。

🌟 DETR模型通过Transformer实现了目标检测的端到端处理,消除了传统检测器中的NMS等人工设计组件,简化了后处理流程。核心在于利用Transformer的自注意力和交叉注意力机制,直接将图像特征与一组可学习的查询(queries)进行交互,预测目标框和类别。

⚙️ 模型构建流程清晰,包括backbone(如ResNet50)提取多尺度特征,并结合Sine位置编码(借鉴了傅里叶思想,高频区分近处,低频区分远处)生成位置信息。Joiner模块将backbone特征与位置编码进行整合,为Transformer输入做准备。

🧠 Transformer编码器和解码器是DETR的核心。编码器负责处理图像特征,通过多层自注意力机制捕捉全局信息;解码器则利用自注意力和交叉注意力,将查询 embedding 与编码器输出的图像特征进行交互,逐步细化目标预测。query embedding的引入是DETR的一大亮点。

🎯 匈牙利算法在DETR中扮演关键角色,用于匹配模型预测的边界框和类别与真实目标。通过计算分类损失(交叉熵)、L1损失和GIOU损失构成的代价矩阵,再利用匈牙利算法找到最优匹配,为后续损失计算和模型训练提供了依据。

📊 损失计算包括分类损失(交叉熵)和边界框损失(L1+GIOU)。分类损失通过将预测类别概率与匹配到的真实类别进行交叉熵计算得出,边界框损失则综合考虑了L1距离和GIOU,以实现对目标位置和尺寸的精确回归。辅助损失(aux_loss)也支持了训练过程。

概述

DETR 通过 transformer 端到端地解决目标检测问题,消除了传统检测器中的人工设计组件,避免部分繁杂的后处理逻辑,例如常见的 NMS 等等。本文记录 DETR 的核心代码逻辑走读。

build model & train forward

构造完整模型的调用栈:

build_model  -> build    -> build_backbone    -> build_transformer    -> DETR or DETRsegm    -> SetCriterion    -> PostProcessor

build_backbone

build_backbone  -> build_position_encoding  -> join Joiner and position_embedding

根据默认 args 配置,backbone 选用了 resnet50;position_embedding 默认为 sine(即 DETRv2 版本)

PositionEmbeddingSine

def forward(self, tensor_list: NestedTensor):    # 1. 输入完整特征图    x = tensor_list.tensors    mask = tensor_list.mask    assert mask is not None    # 2. 取其中未被 mask 的部分    not_mask = ~mask    # 3. 计算累积和得到坐标信息    y_embed = not_mask.cumsum(1, dtype=torch.float32)    # 垂直方向累加    x_embed = not_mask.cumsum(2, dtype=torch.float32)    # 水平方向累加    # 4. 将坐标归一化到 [0,scale] 范围,scale 默认为 2pi    if self.normalize:        eps = 1e-6        y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale        x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale    """    5. 位置编码中的频率生成部分    """    # 5.1 生成一个序列    dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)    # 假设 num_pos_feats=64, 结果是: [0,1,2,3,...,63]    dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)    # 应用这个公式进一步生成一组指数递增的频率值,低位索引对应低频率(数值小),高位索引对应高频率(数值大)    # 6. 生成正弦波    # 假设 num_pos_feats = 4 时,简化后的频率值 dim_t = [1, 1, 100, 100]    pos_x = x_embed[:, :, :, None] / dim_t    # 假设 x_embed = 5,pos_x = [5/1, 5/1, 5/100, 5/100] = [5, 5, 0.05, 0.05]    pos_y = y_embed[:, :, :, None] / dim_t    # 假设 y_embed= 5,pos_y = [5/1, 5/1, 5/100, 5/100] = [5, 5, 0.05, 0.05]    pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)    # [sin(5), cos(5), sin(0.05), cos(0.05)]    pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)    # [sin(5), cos(5), sin(0.05), cos(0.05)]    pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)    return pos

PositionEmbeddingSine 的思路来源于《Attention is all you need》,同时有点类似于傅里叶中思想,既能区分相近位置(通过高频部分),又能区分远处位置(通过低频部分):

因为直接对5取sin/cos,位置稍微变化(比如5.1),这部分值变化就会很大,用于精确编码相近位置的差异

因为5除以了一个大数(比如100)得到0.05,位置变化不大时(比如5.1/100),这部分值变化很小用于粗略编码远处位置的差异

Backbone

默认 ResNet50,如果 return_interm_layers=True,backbone 会返回 ResNet 的 4 个 stage 的特征图:

layer1: C=256, H/4, W/4 (第一个stage)layer2: C=512, H/8, W/8 (第二个stage)layer3: C=1024, H/16, W/16 (第三个stage)layer4: C=2048, H/32, W/32 (第四个stage)# Backbone out:out = {    "0": NestedTensor(feature1, mask1),  # layer1的输出    "1": NestedTensor(feature2, mask2),  # layer2的输出    "2": NestedTensor(feature3, mask3),  # layer3的输出    "3": NestedTensor(feature4, mask4)   # layer4的输出}

Joiner

class Joiner(nn.Sequential):    def __init__(self, backbone, position_embedding):        super().__init__(backbone, position_embedding)        # self[0] 是 backbone        # self[1] 是 position_embedding            def forward(self, tensor_list: NestedTensor):        # 1. 首先通过backbone得到多尺度特征        xs = self[0](tensor_list)  # 得到dict形式的特征图                # 2. 准备两个列表存结果        out: List[NestedTensor] = []  # 存特征图        pos = []  # 存位置编码                # 3. 对每个尺度的特征        for name, x in xs.items():            out.append(x)  # 保存特征图            # 对每个特征图计算位置编码            pos.append(self[1](x).to(x.tensors.dtype))                # 4. 返回特征图和对应的位置编码        return out, pos

build_transformer

TransformerEncoderLayer

以 Post-Norm 为例,核心的结构以及 forward:

class TransformerEncoderLayer(nn.Module):    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,                 activation="relu", normalize_before=False):        super().__init__()        self.self_attn = nn.MultiheadAttention(            d_model=d_model,    # 输入特征维度            nhead=nhead,        # 8个注意力头            dropout=dropout     # 注意力dropout        )        # 每个头的维度是: d_model/nhead        # 两层 MLP,中间升维再降维        self.linear1 = nn.Linear(d_model, dim_feedforward)   # 升维        self.dropout = nn.Dropout(dropout)                 # FFN的dropout        self.linear2 = nn.Linear(dim_feedforward, d_model)   # 降维        self.norm1 = nn.LayerNorm(d_model)   # 自注意力后的LayerNorm        self.norm2 = nn.LayerNorm(d_model)   # FFN后的LayerNorm        self.dropout1 = nn.Dropout(dropout)  # 自注意力的残差dropout        self.dropout2 = nn.Dropout(dropout)  # FFN的残差dropout        self.activation = _get_activation_fn(activation)        self.normalize_before = normalize_before    def with_pos_embed(self, tensor, pos: Optional[Tensor]):        return tensor if pos is None else tensor + pos    def forward_post(self,                     src,                     src_mask: Optional[Tensor] = None,                     src_key_padding_mask: Optional[Tensor] = None,                     pos: Optional[Tensor] = None):        # 1. 自注意力        q = k = self.with_pos_embed(src, pos)   # q,k加入位置编码        src2 = self.self_attn(            q, k, value=src,                    # value不加位置编码            attn_mask=src_mask,            key_padding_mask=src_key_padding_mask        )[0]        src = src + self.dropout1(src2)         # 残差连接        src = self.norm1(src)                   # LayerNorm        # 2. FFN        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))        src = src + self.dropout2(src2)         # 残差连接        src = self.norm2(src)                   # LayerNorm                return src

这边 forward_post 和 forward_pre 没有本质上的区别,只是 LayerNorm 顺序的不同:

Post-Norm(原始Transformer设计)Pre-Norm(改进设计)
LayerNorm在残差连接后训练更稳定(浅层网络)但深层网络可能有梯度问题LayerNorm在残差连接前训练不太稳定但更适合深层网络梯度传播更容易

TransformerEncoder

套了多层 layer 的 TransformerEncoderLayer,每层之间也是残差连接。

TransformerDecoderLayer

class TransformerDecoderLayer(nn.Module):    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,                 activation="relu", normalize_before=False):        super().__init__()        # 1. 自注意力层(Self-Attention)        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)        # 2. 交叉注意力层(Cross-Attention)        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)        # 3. 前馈神经网络(Feedforward)        self.linear1 = nn.Linear(d_model, dim_feedforward)        self.dropout = nn.Dropout(dropout)        self.linear2 = nn.Linear(dim_feedforward, d_model)        self.norm1 = nn.LayerNorm(d_model)   # self-attn后LayerNorm        self.norm2 = nn.LayerNorm(d_model)   # cross-attn后LayerNorm        self.norm3 = nn.LayerNorm(d_model)   # FFN后LayerNorm        self.dropout1 = nn.Dropout(dropout)  # self-attn的残差dropout        self.dropout2 = nn.Dropout(dropout)  # cross-attn的残差dropout        self.dropout3 = nn.Dropout(dropout)  # FFN的残差dropout        self.activation = _get_activation_fn(activation)        self.normalize_before = normalize_before    def with_pos_embed(self, tensor, pos: Optional[Tensor]):        return tensor if pos is None else tensor + pos    def forward_post(self, tgt, memory,                     tgt_mask: Optional[Tensor] = None,                     memory_mask: Optional[Tensor] = None,                     tgt_key_padding_mask: Optional[Tensor] = None,                     memory_key_padding_mask: Optional[Tensor] = None,                     pos: Optional[Tensor] = None,                     query_pos: Optional[Tensor] = None):        # 1. Self-Attention        q = k = self.with_pos_embed(tgt, query_pos)                     # Q和K相同,加入query位置编码        tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,      # V是原始输入                              key_padding_mask=tgt_key_padding_mask)[0]        tgt = tgt + self.dropout1(tgt2)        tgt = self.norm1(tgt)        # 2. Cross-Attention        tgt2 = self.multihead_attn(            query=self.with_pos_embed(tgt, query_pos),  # Q来自前面自注意力后得到的tgt,加入query位置编码            key=self.with_pos_embed(memory, pos),       # K来自encoder,加入encoder位置编码            value=memory,                               # V来自encoder,不加位置编码            attn_mask=memory_mask,            key_padding_mask=memory_key_padding_mask        )[0]        tgt = tgt + self.dropout2(tgt2)        tgt = self.norm2(tgt)        # 3. FFN        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))        tgt = tgt + self.dropout3(tgt2)        tgt = self.norm3(tgt)        return tgt

自注意力的作用是:让每个 query 位置能看到其他所有 query 位置的信息,处理 query 之间的关系。

交叉注意力的作用是:让每个 query 位置去关注 encoder 输出的不同位置,实现 decoder 对输入图像的选择性关注。

TransformerDecoder

套了多层 layer 的 TransformerDecoderLayer,每层之间也是残差连接。

DETR backbone -> transformer 数据流

    def forward(self, samples: NestedTensor):        # 这一节只梳理 backbone 到 transformer 输出的部分        if isinstance(samples, (list, torch.Tensor)):            samples = nested_tensor_from_tensor_list(samples)        # 1. backbone输出        features, pos = self.backbone(samples)        # features: 多尺度特征图列表[f1,f2,f3,f4]        # pos: 对应的位置编码列表[p1,p2,p3,p4]        # 2. 取最后一层特征        src, mask = features[-1].decompose()        # src: [B,C,H,W] 最后一层特征图        # mask: [B,H,W] padding mask        assert mask is not None        # 3. 输入transformer        # self.input_proj(src) [B,C,H,W] -> [B,hidden_dim,H,W]        hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]        # 以下在后续章节补充        outputs_class = self.class_embed(hs)        outputs_coord = self.bbox_embed(hs).sigmoid()        out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}        if self.aux_loss:            out['aux_outputs'] = self._set_aux_loss(outputs_class, outputs_coord)        return out

最终输入给 transformer 的内容:

    特征维度调整后的特征图padding maskquery embedding:nn.Embedding(num_queries, hidden_dim)最后一层特征的位置编码

transformer 中进一步的数据流转:

def forward(self, src, mask, query_embed, pos_embed):    # 1. flatten NxCxHxW to HWxNxC    bs, c, h, w = src.shape    src = src.flatten(2).permute(2, 0, 1)    # 2. 位置编码维度变换做了同样处理    pos_embed = pos_embed.flatten(2).permute(2, 0, 1)    # 3. 准备query_embed [N,C] -> [N,1,C] -> [N,B,C]    query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)    mask = mask.flatten(1)    tgt = torch.zeros_like(query_embed)    # 4. encoder输入    # src: [HW,B,C] 图像特征    # mask: [B,HW] padding mask    # pos_embed: [HW,B,C] 位置编码    memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)    # 5. decoder输入    # tgt: [N,B,C] 初始全0    # memory: [HW,B,C] encoder输出的图像特征    # mask: [B,HW] padding mask    # sin/cos pos_embed: [HW,B,C] 图像位置编码    # query_embed 作为 decoder 的 query_pos: [N,B,C] query位置编码    hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,                      pos=pos_embed, query_pos=query_embed)    # 6. 输出    # hs: [L,N,B,C] 解码器输出,记得这里L是decoder层数,N是query数,B是batch size,C是query特征维度    # memory: [HW,B,C] 图像特征    return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)    # hs.transpose(1, 2): [L,B,N,C]    # memory.permute(1, 2, 0).view(bs, c, h, w): [B,C,H,W] 

尽管 tgt 每个 loop 初始都是0,但是能以每次已经过前一次学习的 query_pos 为起点,每个 layer 残差学习为路径,将这个过程中更新的 tgt 作为输出。

class 和 bbox 预测

outputs_class = self.class_embed(hs)    # [L,B,N,num_classes+1]# 其中:self.class_embed = nn.Linear(hidden_dim, num_classes + 1)# hidden_dim -> num_classes + 1

直接用线性层做分类预测,num_classes + 1 中的 +1 是 "no object" 类,每个 query 要么预测一个物体类别,要么预测 "no object"。

outputs_coord = self.bbox_embed(hs).sigmoid()    # [L,B,N,4]# 其中:self.bbox_embed = MLP(    input_dim=hidden_dim,    hidden_dim=hidden_dim,    output_dim=4,  # (x,y,w,h)    num_layers=3   # 3层MLP)

边界框预测更复杂,因此用 MLP 而不是 Linear,3 层 MLP 提供非线性变换能力,输出 4 个值对应 (x,y,w,h),最后用 sigmoid 归一化到 [0, 1] 范围。

out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}

最终的分类和回归的 output 都只取了 decoder 的最后一层的结果。

SetCriterion

    def forward(self, outputs, targets):        """ This performs the loss computation.        Parameters:             outputs: dict of tensors, see the output specification of the model for the format             targets: list of dicts, such that len(targets) == batch_size.                      The expected keys in each dict depends on the losses applied, see each loss' doc        """        # 分离出非辅助输出        outputs_without_aux = {k: v for k, v in outputs.items() if k != 'aux_outputs'}        # 获取输出和真实标签之间的匹配【核心】        # Retrieve the matching between the outputs of the last layer and the targets        indices = self.matcher(outputs_without_aux, targets)        # 计算所有图片中目标框的总数        # Compute the average number of target boxes accross all nodes, for normalization purposes        num_boxes = sum(len(t["labels"]) for t in targets)        num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device)        if is_dist_avail_and_initialized():            torch.distributed.all_reduce(num_boxes)        num_boxes = torch.clamp(num_boxes / get_world_size(), min=1).item() # 平均每个GPU的boxes数        # 计算主要损失        # Compute all the requested losses        losses = {}        for loss in self.losses:            losses.update(self.get_loss(loss, outputs, targets, indices, num_boxes))        # 计算辅助损失        # 略。。。        return losses

匈牙利匹配

匈牙利算法(Hungarian Algorithm)是一个经典的二分图最优匹配算法,1955年就提出了,用于解决任务分配问题。DETR的创新在于如何构造成本矩阵和将其应用到目标检测中。

def forward(self, outputs, targets):    bs, num_queries = outputs["pred_logits"].shape[:2]    # 1. 获取预测结果    # We flatten to compute the cost matrices in a batch    out_prob = outputs["pred_logits"].flatten(0, 1).softmax(-1)  # [batch_size * num_queries, num_classes]    out_bbox = outputs["pred_boxes"].flatten(0, 1)  # [batch_size * num_queries, 4]    # 2. 获取真实标签    # Also concat the target labels and boxes    tgt_ids = torch.cat([v["labels"] for v in targets])    tgt_bbox = torch.cat([v["boxes"] for v in targets])    # 3. 计算三种cost    # Compute the classification cost. Contrary to the loss, we don't use the NLL,    # but approximate it in 1 - proba[target class].    # The 1 is a constant that doesn't change the matching, it can be ommitted .    cost_class = -out_prob[:, tgt_ids]  # 分类cost    # Compute the L1 cost between boxes    cost_bbox = torch.cdist(out_bbox, tgt_bbox, p=1)    # bbox的L1距离cost    # Compute the giou cost betwen boxes    cost_giou = -generalized_box_iou(box_cxcywh_to_xyxy(out_bbox), box_cxcywh_to_xyxy(tgt_bbox))    # GIOU cost    # Final cost matrix    C = self.cost_bbox * cost_bbox + self.cost_class * cost_class + self.cost_giou * cost_giou  # 带权重的cost矩阵    C = C.view(bs, num_queries, -1).cpu()    # -1是指当前bs的所有图片的目标总数    # 4. 匈牙利算法求解    sizes = [len(v["boxes"]) for v in targets]    indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(sizes, -1))]    # 返回匹配结果    return [(torch.as_tensor(i, dtype=torch.int64), torch.as_tensor(j, dtype=torch.int64)) for i, j in indices]

具体匈牙利算法求解的部分:

理解 C.split(sizes, -1)
        sizes = [len(v["boxes"]) for v in targets]        indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(sizes, -1))]

sizes 是一个列表,记录每张图片中真实目标的数量。比如 sizes = [3, 5] 表示,第一张图有 3 个目标,第二张图有 5 个目标。

C.split(sizes, -1):沿着最后一个维度,按照 sizes 中的数值分割 cost 矩阵。还是用上面的例子:

linear_sum_assignment
[linear_sum_assignment(c[i]) for i, c in enumerate(C.split(sizes, -1))]

对每张图片分别执行匈牙利算法,linear_sum_assignment 来自 scipy,返回最优匹配的行列索引对。还是延续上面的例子,比如对于第一张图:

get_loss

    def get_loss(self, loss, outputs, targets, indices, num_boxes, **kwargs):        loss_map = {            'labels': self.loss_labels,            'cardinality': self.loss_cardinality,            'boxes': self.loss_boxes,            'masks': self.loss_masks        }        assert loss in loss_map, f'do you really want to compute {loss} loss?'        return loss_map[loss](outputs, targets, indices, num_boxes, **kwargs)

分别调用四种 loss 的计算逻辑。

分类损失 loss_labels
    def loss_labels(self, outputs, targets, indices, num_boxes, log=True):        # 1. 获取预测的类别概率        src_logits = outputs['pred_logits']  # shape: [batch_size, num_queries, num_classes+1]        # 2. 获取匹配的索引        idx = self._get_src_permutation_idx(indices)                # 3. 构建目标类别        # 收集匹配上的真实标签        target_classes_o = torch.cat([t["labels"][J] for t, (_, J) in zip(targets, indices)])        # 创建一个全是 num_classes (背景类) 的张量        target_classes = torch.full(src_logits.shape[:2], self.num_classes,                                    dtype=torch.int64, device=src_logits.device)        # 将匹配的位置填入真实类别        target_classes[idx] = target_classes_o        # 4. 计算交叉熵损失        # empty_weight: 给背景类一个较小的权重来处理正负样本不平衡        loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight)        losses = {'loss_ce': loss_ce}        if log:            # TODO this should probably be a separate loss, not hacked in this one here            losses['class_error'] = 100 - accuracy(src_logits[idx], target_classes_o)[0]        return losses

这边构造目标类别 target_classes_o 时,举例理解:

假设 batch_size=2targets = [    {"labels": tensor([2,5,8])},        # 第一张图有3个目标,类别是2,5,8    {"labels": tensor([1,4])}           # 第二张图有2个目标,类别是1,4]indices = [    # (pred, target)    (tensor([7,23,45]), tensor([0,1,2])),  # 第一张图的匹配结果    (tensor([12,34]), tensor([0,1]))        # 第二张图的匹配结果]zip (targets, indices) 会产生:第一张图: ({ "labels" : tensor([ 2 , 5 , 8 ])}, (tensor([ 7 , 23 , 45 ]), tensor([ 0 , 1 , 2 ])))第二张图: ({ "labels" : tensor([ 1 , 4 ])}, (tensor([ 12 , 34 ]), tensor([ 0 , 1 ])))t[ "labels" ][J] 会取出匹配的标签:第一张图: [ 2 , 5 , 8 ][ 0 , 1 , 2 ] -> [ 2 , 5 , 8 ]第二张图: [ 1 , 4 ][ 0 , 1 ] -> [ 1 , 4 ]torch.cat 后:target_classes_o = tensor([2,5,8,1,4])  # 所有匹配上的目标的类别

下一步 target_classes 则是 [batch_size, query_num] 大小的,值全为背景类的张量。

target_classes[idx] = target_classes_o 则在已匹配的 idx 填充上了对应的类别。

最终分类 loss 计算的部分采用了带权重的交叉熵损失函数,处理正负样本不平衡。

基数损失 loss_cardinality
    @torch.no_grad()  # 注意这个装饰器,表示不计算梯度    def loss_cardinality(self, outputs, targets, indices, num_boxes):        pred_logits = outputs['pred_logits']  # [bs, num_queries, num_classes+1]                # 1. 获取每张图片中真实目标的数量        tgt_lengths = torch.as_tensor([len(v["labels"]) for v in targets], device=device)        # 例如: [3, 5] 表示第一张图有3个目标,第二张图有5个目标        # 2. 计算每张图片中预测为前景的数量        # pred_logits.shape[-1] - 1 是背景类的索引        card_pred = (pred_logits.argmax(-1) != pred_logits.shape[-1] - 1).sum(1)        # 例如: [4, 6] 表示第一张图预测了4个前景目标,第二张图预测了6个前景目标        # 3. 计算预测数量和真实数量之间的L1误差        card_err = F.l1_loss(card_pred.float(), tgt_lengths.float())        losses = {'cardinality_error': card_err}        return losses

这边 Cardinality Loss 是一种用于评估和监控目标检测模型在目标数量预测方面表现的指标,虽然它不直接参与模型的训练过程,但却是理解模型性能的重要工具。

回归损失 loss_boxes
    def loss_boxes(self, outputs, targets, indices, num_boxes):        """Compute the losses related to the bounding boxes, the L1 regression loss and the GIoU loss           targets dicts must contain the key "boxes" containing a tensor of dim [nb_target_boxes, 4]           The target boxes are expected in format (center_x, center_y, w, h), normalized by the image size.        """        assert 'pred_boxes' in outputs        # 1. 获取匹配的预测框索引        idx = self._get_src_permutation_idx(indices)  # 获取预测框的匹配索引        src_boxes = outputs['pred_boxes'][idx]  # 只取匹配的预测框        # 2. 获取目标框        target_boxes = torch.cat([t['boxes'][i] for t, (_, i) in zip(targets, indices)], dim=0)        # 将所有匹配的目标框拼接成一个张量        # 3. 计算L1损失        loss_bbox = F.l1_loss(src_boxes, target_boxes, reduction='none')        # 计算预测框和目标框之间的L1损失,reduction='none'表示不进行归约        losses = {}        losses['loss_bbox'] = loss_bbox.sum() / num_boxes  # 归一化损失        # 4. 计算GIoU损失        loss_giou = 1 - torch.diag(box_ops.generalized_box_iou(            box_ops.box_cxcywh_to_xyxy(src_boxes),            box_ops.box_cxcywh_to_xyxy(target_boxes)))        # 计算GIoU损失,使用对角线元素表示每个预测框与目标框的GIoU        losses['loss_giou'] = loss_giou.sum() / num_boxes  # 归一化GIoU损失        return losses

loss_boxes 具体由 loss_bbox 和 loss_giou 两部分组成

DETRSegm 简要介绍

DETRsegm 是在原始 DETR 模型基础上扩展的一个版本,主要用于处理实例分割任务。与 DETR 相比,DETRsegm 增加了以下几个关键部分:

Mask Head

self.mask_head = MaskHeadSmallConv(hidden_dim + nheads, [1024, 512, 256], hidden_dim)

MaskHeadSmallConv 是另一个模块用于生成分割 mask。它接收来自 DETR 的特征图和边界框注意力(bbox attention)作为输入,输出每个 query 的分割 mask。

Bounding Box Attention

bbox_mask = self.bbox_attention(hs[-1], memory, mask=mask)

bbox_attention 是一个多头注意力机制,用于处理边界框信息。结合了 DETR 的输出特征和编码器的记忆(memory),生成与每个查询相关的边界框注意力,类似于一个桥梁,连接目标检测和实例分割任务。

Segmentation Masks Output

seg_masks = self.mask_head(src_proj, bbox_mask, [features[2].tensors, features[1].tensors, features[0].tensors])outputs_seg_masks = seg_masks.view(bs, self.detr.num_queries, seg_masks.shape[-2], seg_masks.shape[-1])

通过 mask_head 生成的分割 mask 被调整为适合输出的形状。输出的 pred_masks 包含了每个查询的分割 mask,形状为 [batch_size, num_queries, height, width]。

这里分割头和分类头、bbox 检测头的区别就是让 query 预测出分割的 mask。

分割损失 loss_masks

    def loss_masks(self, outputs, targets, indices, num_boxes):        """Compute the losses related to the masks: the focal loss and the dice loss.           targets dicts must contain the key "masks" containing a tensor of dim [nb_target_boxes, h, w]        """        assert "pred_masks" in outputs        # 1. 获取匹配的预测mask索引        src_idx = self._get_src_permutation_idx(indices)  # 获取预测mask的索引        tgt_idx = self._get_tgt_permutation_idx(indices)  # 获取目标mask的索引        src_masks = outputs["pred_masks"]  # 预测的mask        src_masks = src_masks[src_idx]  # 只取匹配的预测mask        # 2. 获取目标mask        masks = [t["masks"] for t in targets]  # 从目标中提取mask        # TODO use valid to mask invalid areas due to padding in loss        target_masks, valid = nested_tensor_from_tensor_list(masks).decompose()  # 将目标mask转换为张量        target_masks = target_masks.to(src_masks)  # 将目标mask移动到与预测mask相同的设备        target_masks = target_masks[tgt_idx]  # 只取匹配的目标mask        # 3. 将预测mask上采样到目标大小        # upsample predictions to the target size        src_masks = interpolate(src_masks[:, None], size=target_masks.shape[-2:], mode="bilinear", align_corners=False)        src_masks = src_masks[:, 0].flatten(1)  # 将上采样后的mask展平        target_masks = target_masks.flatten(1)  # 将目标mask展平        target_masks = target_masks.view(src_masks.shape)  # 确保目标mask与预测mask形状一致        # 4. 计算损失        losses = {            "loss_mask": sigmoid_focal_loss(src_masks, target_masks, num_boxes),  # 计算焦点损失            "loss_dice": dice_loss(src_masks, target_masks, num_boxes),  # 计算 Dice 损失        }        return losses

这里只计算匹配上的 mask 的损失,未匹配的预测(即背景)不参与 mask 损失计算。避免了显式处理背景 mask。

evaluate

postprocessors

    postprocessors = {'bbox': PostProcess()}    if args.masks:        postprocessors['segm'] = PostProcessSegm()        if args.dataset_file == "coco_panoptic":            is_thing_map = {i: i <= 90 for i in range(201)}            postprocessors["panoptic"] = PostProcessPanoptic(is_thing_map, threshold=0.85)

bbox postprocessor:PostProcess

class PostProcess(nn.Module):    """ This module converts the model's output into the format expected by the coco api"""    @torch.no_grad()    def forward(self, outputs, target_sizes):        """ Perform the computation        Parameters:            outputs: raw outputs of the model            target_sizes: tensor of dimension [batch_size x 2] containing the size of each images of the batch                          For evaluation, this must be the original image size (before any data augmentation)                          For visualization, this should be the image size after data augment, but before padding        """        out_logits, out_bbox = outputs['pred_logits'], outputs['pred_boxes']        # out_logits: [batch_size, num_queries, num_classes+1]        # out_bbox: [batch_size, num_queries, 4]        assert len(out_logits) == len(target_sizes)        assert target_sizes.shape[1] == 2        prob = F.softmax(out_logits, -1)  # 将logits转换为概率        scores, labels = prob[..., :-1].max(-1)  # 获取最高概率及其对应的类别        # 边界框坐标转换: 将边界框从中心点格式 [cx, cy, w, h] 转换为左上右下角格式 [x1, y1, x2, y2]        # convert to [x0, y0, x1, y1] format        boxes = box_ops.box_cxcywh_to_xyxy(out_bbox)        # and from relative [0, 1] to absolute [0, height] coordinates        img_h, img_w = target_sizes.unbind(1)        scale_fct = torch.stack([img_w, img_h, img_w, img_h], dim=1)        boxes = boxes * scale_fct[:, None, :]        results = [{'scores': s, 'labels': l, 'boxes': b} for s, l, b in zip(scores, labels, boxes)]        return results

主要做了两件事:

segm postprocessor:PostProcessSegm

class PostProcessSegm(nn.Module):    def __init__(self, threshold=0.5):        super().__init__()        self.threshold = threshold    @torch.no_grad()    def forward(self, results, outputs, orig_target_sizes, max_target_sizes):        assert len(orig_target_sizes) == len(max_target_sizes)        # 1. 获取最大尺寸        max_h, max_w = max_target_sizes.max(0)[0].tolist()        # 2. 处理预测mask        outputs_masks = outputs["pred_masks"].squeeze(2)        # 上采样到最大尺寸        outputs_masks = F.interpolate(outputs_masks, size=(max_h, max_w), mode="bilinear", align_corners=False)        # 二值化:sigmoid后与阈值比较,得到二值mask        outputs_masks = (outputs_masks.sigmoid() > self.threshold).cpu()        for i, (cur_mask, t, tt) in enumerate(zip(outputs_masks, max_target_sizes, orig_target_sizes)):            img_h, img_w = t[0], t[1]            # 1. 裁剪到实际图片大小            results[i]["masks"] = cur_mask[:, :img_h, :img_w].unsqueeze(1)            # 2. 调整到原始图片尺寸            results[i]["masks"] = F.interpolate(                results[i]["masks"].float(), size=tuple(tt.tolist()), mode="nearest"            ).byte()            # 以上操作能够便于对不同大小的输入图片进行处理        return results

CocoEvaluator

evaluate metrics 的部分主要复用了 coco 数据集的评测指标,主要包含:

    目标检测指标 (bbox):
    实例分割指标 (segm):

总结核心流程

    模型构建流程

      Backbone (ResNet50) 提取多尺度特征Position Encoding 生成位置编码Transformer Encoder 处理图像特征Transformer Decoder 结合 query 生成目标表示预测头输出类别和边界框坐标

    数据流转过程

      输入图像 → Backbone 提取特征特征图 + 位置编码 → EncoderEncoder 输出 + query embedding → DecoderDecoder 输出 → 分类头和回归头最终输出预测的类别和边界框

    匹配和损失计算

      使用匈牙利算法进行预测和真实目标的一对一匹配计算分类损失(交叉熵)计算回归损失(L1 Loss 和 GIoU Loss)计算基数损失(监控预测目标数量)所有损失加权组合

    扩展实例分割

      增加边界框注意力机制添加mask头生成分割mask计算mask损失(Focal Loss 和 Dice Loss)处理mask的尺寸对齐

    后处理和评估

      处理预测结果的格式转换调整预测框和mask的尺寸使用 COCO 评测指标进行评估计算各种精度和召回率指标

全景分割相关的先跳过,不做展开

参考

DETR 代码:github.com/facebookres…

带注释的 DETR 代码:github.com/mercurylib/…

DETR 论文:arxiv.org/abs/2005.12…

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

DETR Transformer 目标检测 代码解析 计算机视觉
相关文章