Source code for reparo.order

#!/usr/bin/env python
# -*- coding: utf-8 -*-

'''
Frame ordering
'''

import numpy as np
import cv2
from scipy.spatial import distance


[docs]def chain_frames(dist_matrix, first_frame=0, valid_idxs=None): """Chain together frames that are similar Args: dist_matrix (numpy.ndarray[N, N]): frame codistance matrix first_frame (int, optional): index of the first frame valid_idxs (list<int>, optional): list of valid frame indices Returns: list<int>: ordered frame indices """ current_frame = first_frame used_frames = [current_frame] # For each frame, find the best match that has not been used yet for _ in range(dist_matrix.shape[0] - 1): sorted_candidates = dist_matrix[current_frame].argsort() for frame in sorted_candidates: # Skip invalid frames if isinstance(valid_idxs, list) and frame not in valid_idxs: continue if frame not in used_frames: used_frames.append(frame) current_frame = frame break return used_frames
[docs]def get_chain_split(used_frames, dist_matrix): """Filter noisy frames out Args: used_frames (list<int>): ordered frame indices dist_matrix (numpy.ndarray[N, N]): frame codistance matrix Returns: tuple<int, int>: pair of frame indices around the sequence split """ max_dist = 0 for prev_idx, current_idx in zip([used_frames[-1]] + used_frames[:-1], used_frames): # Take the maximum distance between consecutive frames if dist_matrix[prev_idx][current_idx] > max_dist: max_dist = dist_matrix[prev_idx][current_idx] # Take the frame before and after the split split = (prev_idx, current_idx) return split
[docs]def reorder_frames(frames, metric='cosine', target_size=(128, 128), denoise=False): """Reorder frames in a coherent fashion Args: frames (list<numpy.ndarray[H, W, 3]>): video frames metric (str, optional): matrix used for codistance matrix target_size (tuple<int, int>, optional): frame size used for codistance computation denoise (bool, optional): whether the frames need to be denoised Returns: list<numpy.ndarray[H, W, 3]>: video frames """ # Don't use full resolution images if not needed resized_frames = np.asarray([cv2.resize(f.copy(), target_size).flatten() for f in frames]) dist_matrix = distance.cdist(resized_frames, resized_frames, metric) # Take the closest unused frame and add it to the chain frame_chain = chain_frames(dist_matrix, 0) # Remove frames with consecutive frame distance much higher than the median value if denoise: consec_dist = [dist_matrix[prev_idx, current_idx] for prev_idx, current_idx in zip(frame_chain[:-1], frame_chain[1:])] frame_chain = [elt for elt, dist in zip(frame_chain[:-1], consec_dist) if dist < 5 * np.median(consec_dist)] # Since we most likely didn't start with the actual first frame, there are at least two continuous sequences here # Let's spot the sequence split using the maximum distance between consecutive frames last_frame, first_frame = get_chain_split(frame_chain, dist_matrix) # Check both orders first_chain = chain_frames(dist_matrix, first_frame, valid_idxs=frame_chain) second_chain = chain_frames(dist_matrix, last_frame, valid_idxs=frame_chain) # Take the optimum in terms of sum of consecutive frame distance first_loss = sum(dist_matrix[prev_idx, current_idx] for prev_idx, current_idx in zip(first_chain[:-1], first_chain[1:])) second_loss = sum(dist_matrix[prev_idx, current_idx] for prev_idx, current_idx in zip(second_chain[:-1], second_chain[1:])) frame_chain = first_chain if first_loss < second_loss else second_chain return [frames[idx] for idx in frame_chain]