DETR source code reading

foreword

This article is mainly a record when I read the source code of DETR in mmdet. If there are any errors or problems, please correct me
Reference article: DETR source code reading

Principle introduction


The principle of DETR is very simple. The input image is first passed through a CNN backbone to obtain a feature map. DETR does not use a multi-scale feature map here, so there is only one output feature map. Then, after flattening the feature map and adding postal encoding, it is sent to the standard transformer encoder. After the transformer encoder, it enters the decoder to learn the object query; finally, the frame and category are predicted through the prediction head.

code reading

Let's not look at the processing of the data in the previous train_pipeline, and directly enter the main part of the model.

Extract feature map s

First enter SingleStageDetector, the code is in mmdet/models/detectors/single_stage.py, the code is as follows:

    def forward_train(self,
                      img,
                      img_metas,
                      gt_bboxes,
                      gt_labels,
                      gt_bboxes_ignore=None):
        """
        Args:
            img (Tensor): Input images of shape (N, C, H, W).
                Typically these should be mean centered and std scaled.
            img_metas (list[dict]): A List of image info dict where each dict
                has: 'img_shape', 'scale_factor', 'flip', and may also contain
                'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'.
                For details on the values of these keys see
                :class:`mmdet.datasets.pipelines.Collect`.
            gt_bboxes (list[Tensor]): Each item are the truth boxes for each
                image in [tl_x, tl_y, br_x, br_y] format.
            gt_labels (list[Tensor]): Class indices corresponding to each box
            gt_bboxes_ignore (None | list[Tensor]): Specify which bounding
                boxes can be ignored when computing the loss.

        Returns:
            dict[str, Tensor]: A dictionary of loss components.
        """
        super(SingleStageDetector, self).forward_train(img, img_metas)
        x = self.extract_feat(img) 
        losses = self.bbox_head.forward_train(x, img_metas, gt_bboxes,
                                              gt_labels, gt_bboxes_ignore)
        return losses

First, advance the feature map through x = self.extract_feat(img), here is the resnet used, and only output the feature map of the last layer. Then enter self.bbox_head.forward_train, which is the forward_train of DETRHead.

DETRHead.forward_single()

After entering the forward_train of DETRHead, the output of the forward process of the model is calculated by the following code

   outs = self(x, img_metas)

At this point, the program jumps to the forward function in DETRHead. The code is as follows:

  def forward(self, feats, img_metas):
        """Forward function.
       """
        num_levels = len(feats)
        img_metas_list = [img_metas for _ in range(num_levels)]
        return multi_apply(self.forward_single, feats, img_metas_list)

Here, self.forward_single is called multiple times by the multi_apply function to perform the forward process for each feature level. Since the feature map has only one layer, forward_single is only executed once.
The following program enters forward_single

Generate mask matrix

code show as below:

		batch_size = x.size(0)
        input_img_h, input_img_w = img_metas[0]['batch_input_shape']
        masks = x.new_ones((batch_size, input_img_h, input_img_w))
        for img_id in range(batch_size):
            img_h, img_w, _ = img_metas[img_id]['img_shape']
            masks[img_id, :img_h, :img_w] = 0

        x = self.input_proj(x)
        # interpolate masks to have the same spatial shape with x
        masks = F.interpolate(
            masks.unsqueeze(1), size=x.shape[-2:]).to(torch.bool).squeeze(1)

The so-called mask is to pad the image in order to unify the batch size. The filled part should be discarded in the subsequent calculation of multi-head attention, so a mask matrix is ​​needed to cover it, and the specific shape is [batch, input_img_h, input_img_w]. input_img_h, input_img_w are the size of the pad, img_h, img_w are the original image size.

In the mask matrix, 0 represents the effective area, and 1 represents the pad area.

x = self.input_proj(x)

self.input_proj is a 1*1 convolution kernel with an input channel of 2048 and an output channel of 256. This line of code changes the number of channels of x

Note that the size of the mask at this time is the size of the original image; and the size of the input image has changed after resnet50 downsampling, so it is necessary to further downsample the mask to the same size as the image feature map through the F.interpolate function

# interpolate masks to have the same spatial shape with x
        masks = F.interpolate(
            masks.unsqueeze(1), size=x.shape[-2:]).to(torch.bool).squeeze(1)

Generate positional_encoding

In DETRHead's forward_single(), positional_encoding is generated by the following line of code

pos_embed = self.positional_encoding(masks)

SinePositionalEncoding is used here. The specific code is in mmdet/models/utils/positional_encoding.py. I will not introduce too much here. The shape of pos_embed is finally [bs, embed_dim, h, w]

enter transformer

In DETRHead's forward_single(), enter the transformer through the following line of code

 # outs_dec: [nb_dec, bs, num_query, embed_dim]
outs_dec, _ = self.transformer(x, masks, self.query_embedding.weight,
                                       pos_embed)

The code enters the forward of the Transformer class of mmdet/models/utils/transformer.py. First, do some preparations for entering the transformer, flatten the feature map, etc., and then send it to the transformer.

        bs, c, h, w = x.shape
        # use `view` instead of `flatten` for dynamically exporting to ONNX
        # Flatten the feature map
        x = x.view(bs, c, -1).permute(2, 0, 1)  # [bs, c, h, w] -> [h*w, bs, c] 
        # Flatten pos_embed as well [bs,c,h,w] -> [h*w, bs, c] 
        pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1)
        # Copy query_embed to bs
        query_embed = query_embed.unsqueeze(1).repeat(
            1, bs, 1)  # [num_query, dim] -> [num_query, bs, dim]
        # flatten the mask
        mask = mask.view(bs, -1)  # [bs, h, w] -> [bs, h*w]

encoder

Enter the encoder through the following code,

memory = self.encoder(
            query=x,
            key=None,
            value=None,
            query_pos=pos_embed,
            query_key_padding_mask=mask)

Q in the encoder is the flattened feature map, query_pos=pos_embed K and V are None, when calling mmcv\cnn\bricks\transformer.py for calculation, there is a sentence in the code

temp_key = temp_value = query

That is to say, when performing self atten in the encoder, QKV is the same, which is the flattened feature map

After passing through the encoder, the return ed memory is actually the feature map passed through the encoder, and its shape is the same as the input query, which is [h*w,bs,embed_dims]

decoder

        target = torch.zeros_like(query_embed)
        # out_dec: [num_layers, num_query, bs, dim]
        out_dec = self.decoder(
            query=target,
            key=memory,
            value=memory,
            key_pos=pos_embed,
            query_pos=query_embed,
            key_padding_mask=mask)
        out_dec = out_dec.transpose(1, 2)
        memory = memory.permute(1, 2, 0).reshape(bs, c, h, w)

Here, a target with all 0s is initialized as the query, because when the multi-head attention is performed later, the query and query_pos will be added, which is equivalent to adding it back.

The shape of the returned out_dec is [6,100,2,256] which is [num_dec_layers, num_query,bs, embed_dims]

The order of the entire transformer is as follows:

		bs, c, h, w = x.shape
        # use `view` instead of `flatten` for dynamically exporting to ONNX
        x = x.view(bs, c, -1).permute(2, 0, 1)  # [bs, c, h, w] -> [h*w, bs, c]
        pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1)
        query_embed = query_embed.unsqueeze(1).repeat(
            1, bs, 1)  # [num_query, dim] -> [num_query, bs, dim]
        mask = mask.view(bs, -1)  # [bs, h, w] -> [bs, h*w]
        memory = self.encoder(
            query=x,
            key=None,
            value=None,
            query_pos=pos_embed,
            query_key_padding_mask=mask)
        target = torch.zeros_like(query_embed)
        # out_dec: [num_layers, num_query, bs, dim]
        out_dec = self.decoder(
            query=target,
            key=memory,
            value=memory,
            key_pos=pos_embed,
            query_pos=query_embed,
            key_padding_mask=mask)
        out_dec = out_dec.transpose(1, 2)
        memory = memory.permute(1, 2, 0).reshape(bs, c, h, w)
        return out_dec, memory

First, do the preparations for entering the transformer, then go through the encoder and decoder, and finally the returned outs_dec is actually the updated query, and the memory is equivalent to the feature map after the encoder.

After going through the transformer, the program returns to forward_single() of DETRHead, and performs classification and regression prediction through the returned outs_dec.

predict

In DETRHead's forward_single(), pass

 outs_dec, _ = self.transformer(x, masks, self.query_embedding.weight,
                                       pos_embed)

        all_cls_scores = self.fc_cls(outs_dec)
        all_bbox_preds = self.fc_reg(self.activate(
            self.reg_ffn(outs_dec))).sigmoid()

The obtained all_cls_scores and all_bbox_preds shape s are [num_layer,bs,num_query,81] and [num_layer,bs,num_query,4] respectively

Loss

After getting the forward prediction results, the next step is to perform Hungarian matching and calculate the loss.
The logic of the entire function call to calculate Loss is as follows: first enter the loss() function of DETRHead, in the loss function, the loss_single will be called through the multi_apply function to calculate the loss of each decoder layer, and get_targets() will be called in the loss_single To get the targets of this batch image, _get_target_single() will be called in get_targets() to get the targets of each image in the batch, and Hungarian matching will be performed in _get_target_single()

Let's go directly to _get_target_single(), in _get_target_single(), the Hungarian matching will be performed first. Let's take a look at the Hungarian matching code. This part of the code is in mmdet/core/bbox/assigners/hungarian_assigner.py, the comment is written in in the code

    def assign(self,bbox_pred,cls_pred,gt_bboxes,gt_labels,img_meta,gt_bboxes_ignore=None,eps=1e-7):
  
        assert gt_bboxes_ignore is None, \
            'Only case when gt_bboxes_ignore is None is supported.'
        num_gts, num_bboxes = gt_bboxes.size(0), bbox_pred.size(0)

        # 1. assign -1 by default
        assigned_gt_inds = bbox_pred.new_full((num_bboxes, ),
                                              -1,
                                              dtype=torch.long)
        assigned_labels = bbox_pred.new_full((num_bboxes, ),
                                             -1,
                                             dtype=torch.long)
        if num_gts == 0 or num_bboxes == 0:
            # No ground truth or boxes, return empty assignment
            if num_gts == 0:
                # No ground truth, assign all to background
                assigned_gt_inds[:] = 0
            return AssignResult(
                num_gts, assigned_gt_inds, None, labels=assigned_labels)
        img_h, img_w, _ = img_meta['img_shape']
        factor = gt_bboxes.new_tensor([img_w, img_h, img_w,
                                       img_h]).unsqueeze(0)

        # 2. compute the weighted costs
        # classification and bboxcost.
        '''
        cls_pred.shape : [100,81]
        gt_labels :[5]
        '''
        cls_cost = self.cls_cost(cls_pred, gt_labels) 
        # regression L1 cost
        # Because the predicted bbox_pred is between 0-1, it is necessary to convert gt_bboxes to 0-1 scale before calculating reg_cost
        normalize_gt_bboxes = gt_bboxes / factor
        reg_cost = self.reg_cost(bbox_pred, normalize_gt_bboxes)
        # regression iou cost, defaultly giou is used in official DETR.
        # Convert back to the original image scale and then calculate iou_cost
        bboxes = bbox_cxcywh_to_xyxy(bbox_pred) * factor
        iou_cost = self.iou_cost(bboxes, gt_bboxes)
        # weighted sum of above three costs
        # cost.shape [100,5] because here are 100 prediction boxes against 5 gt
        cost = cls_cost + reg_cost + iou_cost

        # 3. do Hungarian matching on CPU using linear_sum_assignment
        cost = cost.detach().cpu()
        if linear_sum_assignment is None:
            raise ImportError('Please run "pip install scipy" '
                              'to install scipy first.')
        matched_row_inds, matched_col_inds = linear_sum_assignment(cost)
        matched_row_inds = torch.from_numpy(matched_row_inds).to(
            bbox_pred.device)
        matched_col_inds = torch.from_numpy(matched_col_inds).to(
            bbox_pred.device)

        # 4. assign backgrounds and foregrounds
        # assign all indices to backgrounds first
        assigned_gt_inds[:] = 0
        # assign foregrounds based on matching results
        assigned_gt_inds[matched_row_inds] = matched_col_inds + 1
        assigned_labels[matched_row_inds] = gt_labels[matched_col_inds]
        return AssignResult(
            num_gts, assigned_gt_inds, None, labels=assigned_labels)

After returning to _get_target_single(), when the positive and negative samples are matched, the matching results can be sorted and returned. _get_target_single() returns a tuple **(labels, label_weights, bbox_targets, bbox_weights, pos_inds, neg_inds)**
labels: is the category of the predicted 100 boxes, which have been matched here, most of which are background categories

label_weights: is the weight of each label, here is 1
bbox_targets is the position of the predicted frame, which is also matched here. Only the values ​​of the gt boxes are the predicted values, and the rest are all 0.
bbox_weights is the weight of the box, where the value of the box matching gt is 1, and the rest are all 0
pos_inds is the index of positive samples, neg_inds is the index of negative samples.

After executing batch_size times _get_target_single(), the code returns to get_target(), and get_target() returns the label and bbox of this batch to loss_single()

   def loss_single(self,
                    cls_scores,
                    bbox_preds,
                    gt_bboxes_list,
                    gt_labels_list,
                    img_metas,
                    gt_bboxes_ignore_list=None):
                    
 		num_imgs = cls_scores.size(0)
        cls_scores_list = [cls_scores[i] for i in range(num_imgs)]
        bbox_preds_list = [bbox_preds[i] for i in range(num_imgs)]
        cls_reg_targets = self.get_targets(cls_scores_list, bbox_preds_list,
                                           gt_bboxes_list, gt_labels_list,
                                           img_metas, gt_bboxes_ignore_list)
        (labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
         num_total_pos, num_total_neg) = cls_reg_targets
        # cat the results of different batch es together
        labels = torch.cat(labels_list, 0)    #[bs*num_query]
        label_weights = torch.cat(label_weights_list, 0)  #[bs*num_query]
        bbox_targets = torch.cat(bbox_targets_list, 0)   #[bs*num_query,4]
        bbox_weights = torch.cat(bbox_weights_list, 0)   #[bs*num_query,4]

        # classification loss
        # self.cls_out_channels:81
        cls_scores = cls_scores.reshape(-1, self.cls_out_channels)  # [bs*num_query,81]
        # construct weighted avg_factor to match with the official DETR repo
        cls_avg_factor = num_total_pos * 1.0 + \
            num_total_neg * self.bg_cls_weight
        if self.sync_cls_avg_factor:
            cls_avg_factor = reduce_mean(
                cls_scores.new_tensor([cls_avg_factor]))
         # The role of cls_avg_factor: Average factor that is used to average the loss
        cls_avg_factor = max(cls_avg_factor, 1)
		# self.loss_cls:  CrossEntropyLoss()
        loss_cls = self.loss_cls(
            cls_scores, labels, label_weights, avg_factor=cls_avg_factor)

        # Compute the average number of gt boxes across all gpus, for
        # normalization purposes
        num_total_pos = loss_cls.new_tensor([num_total_pos])
        num_total_pos = torch.clamp(reduce_mean(num_total_pos), min=1).item()

        # construct factors used for rescale bboxes
        # Calculate the scaling factor for each graph in a batch
        factors = []
        for img_meta, bbox_pred in zip(img_metas, bbox_preds):
            img_h, img_w, _ = img_meta['img_shape']
            factor = bbox_pred.new_tensor([img_w, img_h, img_w,
                                           img_h]).unsqueeze(0).repeat(
                                               bbox_pred.size(0), 1)
            factors.append(factor)
        factors = torch.cat(factors, 0)  # [bs*num_query,4]

        # DETR regress the relative position of boxes (cxcywh) in the image,
        # thus the learning target is normalized by the image size. So here
        # we need to re-scale them for calculating IoU loss
        # The calculation of iou_loss needs to be performed on the original image size, and the encoding method is xyxy 
        bbox_preds = bbox_preds.reshape(-1, 4)
        bboxes = bbox_cxcywh_to_xyxy(bbox_preds) * factors
        bboxes_gt = bbox_cxcywh_to_xyxy(bbox_targets) * factors

        # regression IoU loss, defaultly GIoU loss
        loss_iou = self.loss_iou(
            bboxes, bboxes_gt, bbox_weights, avg_factor=num_total_pos)

        # regression L1 loss
        # The calculation of l1 loss is carried out on the 0-1 scale, and the encoding method is [x,y,w,h]
        loss_bbox = self.loss_bbox(
            bbox_preds, bbox_targets, bbox_weights, avg_factor=num_total_pos)
        return loss_cls, loss_bbox, loss_iou

The loss_single() function will be executed 6 times, because the loss is calculated for each decoder layer, but the actual loss that needs to be returned to the gradient is only that of the last decoder.
Finally, go back to the loss function, which is actually to sort out the loss and return to loss_dict

losses_cls, losses_bbox, losses_iou = multi_apply(
            self.loss_single, all_cls_scores, all_bbox_preds,
            all_gt_bboxes_list, all_gt_labels_list, img_metas_list,
            all_gt_bboxes_ignore_list)

        loss_dict = dict()
        # loss from the last decoder layer
        loss_dict['loss_cls'] = losses_cls[-1]
        loss_dict['loss_bbox'] = losses_bbox[-1]
        loss_dict['loss_iou'] = losses_iou[-1]
        # loss from other decoder layers
        num_dec_layer = 0
        for loss_cls_i, loss_bbox_i, loss_iou_i in zip(losses_cls[:-1],
                                                       losses_bbox[:-1],
                                                       losses_iou[:-1]):
            loss_dict[f'd{num_dec_layer}.loss_cls'] = loss_cls_i
            loss_dict[f'd{num_dec_layer}.loss_bbox'] = loss_bbox_i
            loss_dict[f'd{num_dec_layer}.loss_iou'] = loss_iou_i
            num_dec_layer += 1
        return loss_dict

After the Loss is calculated, the gradient is returned, the parameters are updated, and then the next forward process is performed.

Tags: Python Deep Learning AI Object Detection

Posted by Bazzaah on Sat, 08 Oct 2022 09:52:16 +1030