foreword
This article is mainly a record when I read the source code of DETR in mmdet. If there are any errors or problems, please correct me
Reference article: DETR source code reading
Principle introduction
The principle of DETR is very simple. The input image is first passed through a CNN backbone to obtain a feature map. DETR does not use a multi-scale feature map here, so there is only one output feature map. Then, after flattening the feature map and adding postal encoding, it is sent to the standard transformer encoder. After the transformer encoder, it enters the decoder to learn the object query; finally, the frame and category are predicted through the prediction head.
code reading
Let's not look at the processing of the data in the previous train_pipeline, and directly enter the main part of the model.
Extract feature map s
First enter SingleStageDetector, the code is in mmdet/models/detectors/single_stage.py, the code is as follows:
def forward_train(self, img, img_metas, gt_bboxes, gt_labels, gt_bboxes_ignore=None): """ Args: img (Tensor): Input images of shape (N, C, H, W). Typically these should be mean centered and std scaled. img_metas (list[dict]): A List of image info dict where each dict has: 'img_shape', 'scale_factor', 'flip', and may also contain 'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'. For details on the values of these keys see :class:`mmdet.datasets.pipelines.Collect`. gt_bboxes (list[Tensor]): Each item are the truth boxes for each image in [tl_x, tl_y, br_x, br_y] format. gt_labels (list[Tensor]): Class indices corresponding to each box gt_bboxes_ignore (None | list[Tensor]): Specify which bounding boxes can be ignored when computing the loss. Returns: dict[str, Tensor]: A dictionary of loss components. """ super(SingleStageDetector, self).forward_train(img, img_metas) x = self.extract_feat(img) losses = self.bbox_head.forward_train(x, img_metas, gt_bboxes, gt_labels, gt_bboxes_ignore) return losses
First, advance the feature map through x = self.extract_feat(img), here is the resnet used, and only output the feature map of the last layer. Then enter self.bbox_head.forward_train, which is the forward_train of DETRHead.
DETRHead.forward_single()
After entering the forward_train of DETRHead, the output of the forward process of the model is calculated by the following code
outs = self(x, img_metas)
At this point, the program jumps to the forward function in DETRHead. The code is as follows:
def forward(self, feats, img_metas): """Forward function. """ num_levels = len(feats) img_metas_list = [img_metas for _ in range(num_levels)] return multi_apply(self.forward_single, feats, img_metas_list)
Here, self.forward_single is called multiple times by the multi_apply function to perform the forward process for each feature level. Since the feature map has only one layer, forward_single is only executed once.
The following program enters forward_single
Generate mask matrix
code show as below:
batch_size = x.size(0) input_img_h, input_img_w = img_metas[0]['batch_input_shape'] masks = x.new_ones((batch_size, input_img_h, input_img_w)) for img_id in range(batch_size): img_h, img_w, _ = img_metas[img_id]['img_shape'] masks[img_id, :img_h, :img_w] = 0 x = self.input_proj(x) # interpolate masks to have the same spatial shape with x masks = F.interpolate( masks.unsqueeze(1), size=x.shape[-2:]).to(torch.bool).squeeze(1)
The so-called mask is to pad the image in order to unify the batch size. The filled part should be discarded in the subsequent calculation of multi-head attention, so a mask matrix is needed to cover it, and the specific shape is [batch, input_img_h, input_img_w]. input_img_h, input_img_w are the size of the pad, img_h, img_w are the original image size.
In the mask matrix, 0 represents the effective area, and 1 represents the pad area.
x = self.input_proj(x)
self.input_proj is a 1*1 convolution kernel with an input channel of 2048 and an output channel of 256. This line of code changes the number of channels of x
Note that the size of the mask at this time is the size of the original image; and the size of the input image has changed after resnet50 downsampling, so it is necessary to further downsample the mask to the same size as the image feature map through the F.interpolate function
# interpolate masks to have the same spatial shape with x masks = F.interpolate( masks.unsqueeze(1), size=x.shape[-2:]).to(torch.bool).squeeze(1)
Generate positional_encoding
In DETRHead's forward_single(), positional_encoding is generated by the following line of code
pos_embed = self.positional_encoding(masks)
SinePositionalEncoding is used here. The specific code is in mmdet/models/utils/positional_encoding.py. I will not introduce too much here. The shape of pos_embed is finally [bs, embed_dim, h, w]
enter transformer
In DETRHead's forward_single(), enter the transformer through the following line of code
# outs_dec: [nb_dec, bs, num_query, embed_dim] outs_dec, _ = self.transformer(x, masks, self.query_embedding.weight, pos_embed)
The code enters the forward of the Transformer class of mmdet/models/utils/transformer.py. First, do some preparations for entering the transformer, flatten the feature map, etc., and then send it to the transformer.
bs, c, h, w = x.shape # use `view` instead of `flatten` for dynamically exporting to ONNX # Flatten the feature map x = x.view(bs, c, -1).permute(2, 0, 1) # [bs, c, h, w] -> [h*w, bs, c] # Flatten pos_embed as well [bs,c,h,w] -> [h*w, bs, c] pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1) # Copy query_embed to bs query_embed = query_embed.unsqueeze(1).repeat( 1, bs, 1) # [num_query, dim] -> [num_query, bs, dim] # flatten the mask mask = mask.view(bs, -1) # [bs, h, w] -> [bs, h*w]
encoder
Enter the encoder through the following code,
memory = self.encoder( query=x, key=None, value=None, query_pos=pos_embed, query_key_padding_mask=mask)
Q in the encoder is the flattened feature map, query_pos=pos_embed K and V are None, when calling mmcv\cnn\bricks\transformer.py for calculation, there is a sentence in the code
temp_key = temp_value = query
That is to say, when performing self atten in the encoder, QKV is the same, which is the flattened feature map
After passing through the encoder, the return ed memory is actually the feature map passed through the encoder, and its shape is the same as the input query, which is [h*w,bs,embed_dims]
decoder
target = torch.zeros_like(query_embed) # out_dec: [num_layers, num_query, bs, dim] out_dec = self.decoder( query=target, key=memory, value=memory, key_pos=pos_embed, query_pos=query_embed, key_padding_mask=mask) out_dec = out_dec.transpose(1, 2) memory = memory.permute(1, 2, 0).reshape(bs, c, h, w)
Here, a target with all 0s is initialized as the query, because when the multi-head attention is performed later, the query and query_pos will be added, which is equivalent to adding it back.
The shape of the returned out_dec is [6,100,2,256] which is [num_dec_layers, num_query,bs, embed_dims]
The order of the entire transformer is as follows:
bs, c, h, w = x.shape # use `view` instead of `flatten` for dynamically exporting to ONNX x = x.view(bs, c, -1).permute(2, 0, 1) # [bs, c, h, w] -> [h*w, bs, c] pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1) query_embed = query_embed.unsqueeze(1).repeat( 1, bs, 1) # [num_query, dim] -> [num_query, bs, dim] mask = mask.view(bs, -1) # [bs, h, w] -> [bs, h*w] memory = self.encoder( query=x, key=None, value=None, query_pos=pos_embed, query_key_padding_mask=mask) target = torch.zeros_like(query_embed) # out_dec: [num_layers, num_query, bs, dim] out_dec = self.decoder( query=target, key=memory, value=memory, key_pos=pos_embed, query_pos=query_embed, key_padding_mask=mask) out_dec = out_dec.transpose(1, 2) memory = memory.permute(1, 2, 0).reshape(bs, c, h, w) return out_dec, memory
First, do the preparations for entering the transformer, then go through the encoder and decoder, and finally the returned outs_dec is actually the updated query, and the memory is equivalent to the feature map after the encoder.
After going through the transformer, the program returns to forward_single() of DETRHead, and performs classification and regression prediction through the returned outs_dec.
predict
In DETRHead's forward_single(), pass
outs_dec, _ = self.transformer(x, masks, self.query_embedding.weight, pos_embed) all_cls_scores = self.fc_cls(outs_dec) all_bbox_preds = self.fc_reg(self.activate( self.reg_ffn(outs_dec))).sigmoid()
The obtained all_cls_scores and all_bbox_preds shape s are [num_layer,bs,num_query,81] and [num_layer,bs,num_query,4] respectively
Loss
After getting the forward prediction results, the next step is to perform Hungarian matching and calculate the loss.
The logic of the entire function call to calculate Loss is as follows: first enter the loss() function of DETRHead, in the loss function, the loss_single will be called through the multi_apply function to calculate the loss of each decoder layer, and get_targets() will be called in the loss_single To get the targets of this batch image, _get_target_single() will be called in get_targets() to get the targets of each image in the batch, and Hungarian matching will be performed in _get_target_single()
Let's go directly to _get_target_single(), in _get_target_single(), the Hungarian matching will be performed first. Let's take a look at the Hungarian matching code. This part of the code is in mmdet/core/bbox/assigners/hungarian_assigner.py, the comment is written in in the code
def assign(self,bbox_pred,cls_pred,gt_bboxes,gt_labels,img_meta,gt_bboxes_ignore=None,eps=1e-7): assert gt_bboxes_ignore is None, \ 'Only case when gt_bboxes_ignore is None is supported.' num_gts, num_bboxes = gt_bboxes.size(0), bbox_pred.size(0) # 1. assign -1 by default assigned_gt_inds = bbox_pred.new_full((num_bboxes, ), -1, dtype=torch.long) assigned_labels = bbox_pred.new_full((num_bboxes, ), -1, dtype=torch.long) if num_gts == 0 or num_bboxes == 0: # No ground truth or boxes, return empty assignment if num_gts == 0: # No ground truth, assign all to background assigned_gt_inds[:] = 0 return AssignResult( num_gts, assigned_gt_inds, None, labels=assigned_labels) img_h, img_w, _ = img_meta['img_shape'] factor = gt_bboxes.new_tensor([img_w, img_h, img_w, img_h]).unsqueeze(0) # 2. compute the weighted costs # classification and bboxcost. ''' cls_pred.shape : [100,81] gt_labels :[5] ''' cls_cost = self.cls_cost(cls_pred, gt_labels) # regression L1 cost # Because the predicted bbox_pred is between 0-1, it is necessary to convert gt_bboxes to 0-1 scale before calculating reg_cost normalize_gt_bboxes = gt_bboxes / factor reg_cost = self.reg_cost(bbox_pred, normalize_gt_bboxes) # regression iou cost, defaultly giou is used in official DETR. # Convert back to the original image scale and then calculate iou_cost bboxes = bbox_cxcywh_to_xyxy(bbox_pred) * factor iou_cost = self.iou_cost(bboxes, gt_bboxes) # weighted sum of above three costs # cost.shape [100,5] because here are 100 prediction boxes against 5 gt cost = cls_cost + reg_cost + iou_cost # 3. do Hungarian matching on CPU using linear_sum_assignment cost = cost.detach().cpu() if linear_sum_assignment is None: raise ImportError('Please run "pip install scipy" ' 'to install scipy first.') matched_row_inds, matched_col_inds = linear_sum_assignment(cost) matched_row_inds = torch.from_numpy(matched_row_inds).to( bbox_pred.device) matched_col_inds = torch.from_numpy(matched_col_inds).to( bbox_pred.device) # 4. assign backgrounds and foregrounds # assign all indices to backgrounds first assigned_gt_inds[:] = 0 # assign foregrounds based on matching results assigned_gt_inds[matched_row_inds] = matched_col_inds + 1 assigned_labels[matched_row_inds] = gt_labels[matched_col_inds] return AssignResult( num_gts, assigned_gt_inds, None, labels=assigned_labels)
After returning to _get_target_single(), when the positive and negative samples are matched, the matching results can be sorted and returned. _get_target_single() returns a tuple **(labels, label_weights, bbox_targets, bbox_weights, pos_inds, neg_inds)**
labels: is the category of the predicted 100 boxes, which have been matched here, most of which are background categories
label_weights: is the weight of each label, here is 1
bbox_targets is the position of the predicted frame, which is also matched here. Only the values of the gt boxes are the predicted values, and the rest are all 0.
bbox_weights is the weight of the box, where the value of the box matching gt is 1, and the rest are all 0
pos_inds is the index of positive samples, neg_inds is the index of negative samples.
After executing batch_size times _get_target_single(), the code returns to get_target(), and get_target() returns the label and bbox of this batch to loss_single()
def loss_single(self, cls_scores, bbox_preds, gt_bboxes_list, gt_labels_list, img_metas, gt_bboxes_ignore_list=None): num_imgs = cls_scores.size(0) cls_scores_list = [cls_scores[i] for i in range(num_imgs)] bbox_preds_list = [bbox_preds[i] for i in range(num_imgs)] cls_reg_targets = self.get_targets(cls_scores_list, bbox_preds_list, gt_bboxes_list, gt_labels_list, img_metas, gt_bboxes_ignore_list) (labels_list, label_weights_list, bbox_targets_list, bbox_weights_list, num_total_pos, num_total_neg) = cls_reg_targets # cat the results of different batch es together labels = torch.cat(labels_list, 0) #[bs*num_query] label_weights = torch.cat(label_weights_list, 0) #[bs*num_query] bbox_targets = torch.cat(bbox_targets_list, 0) #[bs*num_query,4] bbox_weights = torch.cat(bbox_weights_list, 0) #[bs*num_query,4] # classification loss # self.cls_out_channels:81 cls_scores = cls_scores.reshape(-1, self.cls_out_channels) # [bs*num_query,81] # construct weighted avg_factor to match with the official DETR repo cls_avg_factor = num_total_pos * 1.0 + \ num_total_neg * self.bg_cls_weight if self.sync_cls_avg_factor: cls_avg_factor = reduce_mean( cls_scores.new_tensor([cls_avg_factor])) # The role of cls_avg_factor: Average factor that is used to average the loss cls_avg_factor = max(cls_avg_factor, 1) # self.loss_cls: CrossEntropyLoss() loss_cls = self.loss_cls( cls_scores, labels, label_weights, avg_factor=cls_avg_factor) # Compute the average number of gt boxes across all gpus, for # normalization purposes num_total_pos = loss_cls.new_tensor([num_total_pos]) num_total_pos = torch.clamp(reduce_mean(num_total_pos), min=1).item() # construct factors used for rescale bboxes # Calculate the scaling factor for each graph in a batch factors = [] for img_meta, bbox_pred in zip(img_metas, bbox_preds): img_h, img_w, _ = img_meta['img_shape'] factor = bbox_pred.new_tensor([img_w, img_h, img_w, img_h]).unsqueeze(0).repeat( bbox_pred.size(0), 1) factors.append(factor) factors = torch.cat(factors, 0) # [bs*num_query,4] # DETR regress the relative position of boxes (cxcywh) in the image, # thus the learning target is normalized by the image size. So here # we need to re-scale them for calculating IoU loss # The calculation of iou_loss needs to be performed on the original image size, and the encoding method is xyxy bbox_preds = bbox_preds.reshape(-1, 4) bboxes = bbox_cxcywh_to_xyxy(bbox_preds) * factors bboxes_gt = bbox_cxcywh_to_xyxy(bbox_targets) * factors # regression IoU loss, defaultly GIoU loss loss_iou = self.loss_iou( bboxes, bboxes_gt, bbox_weights, avg_factor=num_total_pos) # regression L1 loss # The calculation of l1 loss is carried out on the 0-1 scale, and the encoding method is [x,y,w,h] loss_bbox = self.loss_bbox( bbox_preds, bbox_targets, bbox_weights, avg_factor=num_total_pos) return loss_cls, loss_bbox, loss_iou
The loss_single() function will be executed 6 times, because the loss is calculated for each decoder layer, but the actual loss that needs to be returned to the gradient is only that of the last decoder.
Finally, go back to the loss function, which is actually to sort out the loss and return to loss_dict
losses_cls, losses_bbox, losses_iou = multi_apply( self.loss_single, all_cls_scores, all_bbox_preds, all_gt_bboxes_list, all_gt_labels_list, img_metas_list, all_gt_bboxes_ignore_list) loss_dict = dict() # loss from the last decoder layer loss_dict['loss_cls'] = losses_cls[-1] loss_dict['loss_bbox'] = losses_bbox[-1] loss_dict['loss_iou'] = losses_iou[-1] # loss from other decoder layers num_dec_layer = 0 for loss_cls_i, loss_bbox_i, loss_iou_i in zip(losses_cls[:-1], losses_bbox[:-1], losses_iou[:-1]): loss_dict[f'd{num_dec_layer}.loss_cls'] = loss_cls_i loss_dict[f'd{num_dec_layer}.loss_bbox'] = loss_bbox_i loss_dict[f'd{num_dec_layer}.loss_iou'] = loss_iou_i num_dec_layer += 1 return loss_dict
After the Loss is calculated, the gradient is returned, the parameters are updated, and then the next forward process is performed.