Source code for pymove.preprocessing.segmentation

"""
Compression operations.

bbox_split,
by_dist_time_speed,
by_max_dist,
by_max_time,
by_max_speed

"""
from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
from numpy import ndarray
from pandas.core.frame import DataFrame

from pymove.utils.constants import (
    DIST_TO_PREV,
    SPEED_TO_PREV,
    TID_DIST,
    TID_PART,
    TID_SPEED,
    TID_TIME,
    TIME_TO_PREV,
    TRAJ_ID,
)
from pymove.utils.log import logger, progress_bar, timer_decorator

if TYPE_CHECKING:
    from pymove.core.dask import DaskMoveDataFrame
    from pymove.core.pandas import PandasMoveDataFrame


[docs]@timer_decorator def bbox_split(bbox: tuple[int, int, int, int], number_grids: int) -> DataFrame: """ Splits the bounding box in N grids of the same size. Parameters ---------- bbox: tuple Tuple of 4 elements, containing the minimum and maximum values of latitude and longitude of the bounding box. number_grids: int Determines the number of grids to split the bounding box. Returns ------- DataFrame Returns the latitude and longitude coordinates of the grids after the split. """ lat_min = bbox[0] lon_min = bbox[1] lat_max = bbox[2] lon_max = bbox[3] const_lat = abs(abs(lat_max) - abs(lat_min)) / number_grids const_lon = abs(abs(lon_max) - abs(lon_min)) / number_grids logger.debug(f'const_lat: {const_lat}\nconst_lon: {const_lon}') move_data = pd.DataFrame( columns=['lat_min', 'lon_min', 'lat_max', 'lon_max'] ) for i in range(number_grids): move_data = move_data.append( { 'lat_min': lat_min, 'lon_min': lon_min + (const_lon * i), 'lat_max': lat_max, 'lon_max': lon_min + (const_lon * (i + 1)), }, ignore_index=True, ) return move_data
def _drop_single_point(move_data: DataFrame, label_new_tid: str, label_id: str): """ Removes trajectory with single point. Parameters ---------- move_data: dataframe dataframe with trajectories label_new_tid : str The label of the column containing the ids of the formed segments. Is the new splitted id. label_id : str Indicates the label of the id column in the user dataframe, by default TRAJ_ID """ shape_before_drop = move_data.shape idx = move_data[move_data[label_new_tid] == -1].index if idx.shape[0] > 0: logger.debug('...Drop Trajectory with a unique GPS point\n') ids_before_drop = move_data[label_id].unique().shape[0] move_data.drop(index=idx, inplace=True) logger.debug( '...Object - before drop: {} - after drop: {}'.format( ids_before_drop, move_data[label_id].unique().shape[0] ) ) logger.debug( '...Shape - before drop: {} - after drop: {}'.format( shape_before_drop, move_data.shape ) ) else: logger.debug('...No trajectories with only one point.') def _filter_and_dist_time_speed( move_data: DataFrame, idx: int, max_dist: float, max_time: float, max_speed: float ) -> ndarray: """ Filters the dataframe considering thresholds for time, dist and speed. Parameters ---------- move_data : dataframe Dataframe to be filtered idx : int row to compare max_dist : float maximum dist diference max_time : float maximum time diference max_speed : float maximum speed diference Returns ------- numpy.ndarray of booleans filtered indexes from the dataframe """ return ( (np.nan_to_num(move_data.at[idx, DIST_TO_PREV]) > max_dist) | (np.nan_to_num(move_data.at[idx, TIME_TO_PREV]) > max_time) | (np.nan_to_num(move_data.at[idx, SPEED_TO_PREV]) > max_speed) ) def _filter_or_dist_time_speed( move_data: DataFrame, idx: int, feature: str, max_between_adj_points: float ) -> ndarray: """ Filters the dataframe considering thresholds for time, dist and speed. Parameters ---------- move_data : dataframe Dataframe to be filtered idx : int row to compare feature : str feature to compare max_between_adj_points : float maximum points diference Returns ------- numpy.ndarray filtered indexes from the dataframe """ return np.nan_to_num(move_data.at[idx, feature]) > max_between_adj_points def _prepare_segmentation(move_data: DataFrame, label_id: str, label_new_tid: str): """ Resets the dataframe index, collects unique ids and initiates curr_id and count. Parameters ---------- move_data : dataframe Dataframe to be filtered label_id : str label of the feature label_new_tid : str label of the new feature Returns ------- int initial curr_tid numpy.ndarray unique ids int initial count """ if move_data.index.name is None: logger.debug(f'...setting {label_id} as index') move_data.set_index(label_id, inplace=True) curr_tid = 0 if label_new_tid not in move_data: move_data[label_new_tid] = curr_tid ids = move_data.index.unique() count = 0 return curr_tid, ids, count def _update_curr_tid_count( filter_: ndarray, move_data: DataFrame, idx: int, label_new_tid: str, curr_tid: int, count: int ) -> tuple[int, int]: """ Updates the tid. Parameters ---------- filter_ : numpy.ndarray Filtered indexes move_data : dataframe Dataframe to be filtered idx : int row to compare label_new_tid : str label of the new feature curr_tid : int current tid count : int count of Returns ------- int updated current tid int updated count ids """ curr_tid += 1 if filter_.shape == (): logger.debug(f'id: {idx} has no point to split') move_data.at[idx, label_new_tid] = curr_tid count += 1 else: tids = np.empty(filter_.shape[0], dtype=np.int64) tids.fill(curr_tid) for i, has_problem in enumerate(filter_): if has_problem: curr_tid += 1 tids[i:] = curr_tid count += tids.shape[0] move_data.at[idx, label_new_tid] = tids return curr_tid, count def _filter_by( move_data: DataFrame, label_id: str, label_new_tid: str, drop_single_points: bool, **kwargs ) -> DataFrame: """ Splits the trajectories into segments. Parameters ---------- move_data : dataframe The input trajectory data label_id : str, optional Indicates the label of the id column in the user dataframe, by default TRAJ_ID label_new_tid : str, optional(TID_PART by default) The label of the column containing the ids of the formed segments. Is the new splitted id. drop_single_points : boolean, optional(True by default) If set to True, drops the trajectories with only one point. **kwargs : arguments depends on the type of segmentation - all : if is a segmentation by all features - max_dist : maximum dist between adjacent points - max_time : maximum time between adjacent points - max_speed : maximum speed between adjacent points - feature : feature to use for segmentation - max_between_adj_points : maximum value for feature Returns ------- dataframe DataFrame with the aditional features: label_new_tid, that indicates the trajectory segment to which the point belongs to. Note ---- Time, distance and speed features must be updated after split. """ curr_tid, ids, count = _prepare_segmentation( move_data, label_id, label_new_tid ) for idx in progress_bar(ids, desc='Generating %s' % label_new_tid): if kwargs['all']: filter_ = _filter_and_dist_time_speed( move_data, idx, kwargs['max_dist'], kwargs['max_time'], kwargs['max_speed'] ) else: filter_ = _filter_or_dist_time_speed( move_data, idx, kwargs['feature'], kwargs['max_between_adj_points'] ) curr_tid, count = _update_curr_tid_count( filter_, move_data, idx, label_new_tid, curr_tid, count ) if label_id == label_new_tid: move_data.reset_index(drop=True, inplace=True) logger.debug('... label_tid = label_new_id, then reseting and drop index') else: move_data.reset_index(inplace=True) logger.debug('... Reseting index\n') if drop_single_points: _drop_single_point(move_data, label_new_tid, label_id) move_data.generate_dist_time_speed_features() return move_data
[docs]@timer_decorator def by_dist_time_speed( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', label_id: str = TRAJ_ID, max_dist_between_adj_points: float = 3000, max_time_between_adj_points: float = 900, max_speed_between_adj_points: float = 50.0, drop_single_points: bool = True, label_new_tid: str = TID_PART, inplace: bool = False, ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Splits the trajectories into segments based on distance, time and speed. Parameters ---------- move_data : dataframe The input trajectory data label_id : str, optional Indicates the label of the id column in the user dataframe, by default TRAJ_ID max_dist_between_adj_points : float, optional Specify the maximum distance a point should have from the previous point, in order not to be dropped, by default 3000 max_time_between_adj_points : float, optional Specify the maximum travel time between two adjacent points, by default 900 max_speed_between_adj_points : float, optional Specify the maximum speed of travel between two adjacent points, by default 50 drop_single_points : boolean, optional If set to True, drops the trajectories with only one point, by default True label_new_tid : str, optional The label of the column containing the ids of the formed segments. Is the new splitted id, by default TID_PART inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the aditional features: label_new_tid, that indicates the trajectory segment to which the point belongs to, by default False Note ---- Time, distance and speed features must be updated after split. """ if not inplace: move_data = move_data.copy() logger.debug('\nSplit trajectories') logger.debug('...max_dist_between_adj_points: {}'.format( max_dist_between_adj_points )) logger.debug('...max_time_between_adj_points: {}'.format( max_time_between_adj_points )) logger.debug('...max_speed_between_adj_points: {}'.format( max_speed_between_adj_points )) if TIME_TO_PREV not in move_data: move_data.generate_dist_time_speed_features() move_data = _filter_by( move_data, label_id, label_new_tid, drop_single_points, max_dist=max_dist_between_adj_points, max_time=max_time_between_adj_points, max_speed=max_speed_between_adj_points, all=True ) if not inplace: return move_data
[docs]@timer_decorator def by_max_dist( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', label_id: str = TRAJ_ID, max_dist_between_adj_points: float = 3000, drop_single_points: bool = True, label_new_tid: str = TID_DIST, inplace: bool = False, ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Segments the trajectories based on distance. Parameters ---------- move_data : dataframe The input trajectory data label_id : str, optional Indicates the label of the id column in the user dataframe, by default TRAJ_ID max_dist_between_adj_points : float, optional Specify the maximum dist between two adjacent points, by default 3000 drop_single_points : boolean, optional If set to True, drops the trajectories with only one point, by default True label_new_tid : str, optional The label of the column containing the ids of the formed segments, by default TID_DIST Is the new splitted id. inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the aditional features: label_segment, that indicates the trajectory segment to which the point belongs to. Note ---- Speed features must be updated after split. """ if not inplace: move_data = move_data.copy() logger.debug( 'Split trajectories by max distance between adjacent points: {}'.format( max_dist_between_adj_points ) ) if DIST_TO_PREV not in move_data: move_data.generate_dist_time_speed_features() move_data = _filter_by( move_data, label_id, label_new_tid, drop_single_points, feature=DIST_TO_PREV, max_between_adj_points=max_dist_between_adj_points, all=False ) if not inplace: return move_data
[docs]@timer_decorator def by_max_time( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', label_id: str = TRAJ_ID, max_time_between_adj_points: float = 900.0, drop_single_points: bool = True, label_new_tid: str = TID_TIME, inplace: bool = False, ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Splits the trajectories into segments based on a maximum. Parameters ---------- move_data : dataframe The input trajectory data label_id : str, optional Indicates the label of the id column in the users dataframe, by default TRAJ_ID max_time_between_adj_points : float, optional Specify the maximum time between two adjacent points, by default 900 drop_single_points : boolean, optional If set to True, drops the trajectories with only one point, by default True label_new_tid : str, optional The label of the column containing the ids of the formed segments, by default TID_TIME Is the new splitted id. inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFRame DataFrame with the additional features: label_segment, that indicates the trajectory segment to which the point belongs to. Note ---- Speed features must be updated after split. """ if not inplace: move_data = move_data.copy() logger.debug( 'Split trajectories by max_time_between_adj_points: {}'.format( max_time_between_adj_points ) ) if TIME_TO_PREV not in move_data: move_data.generate_dist_time_speed_features() move_data = _filter_by( move_data, label_id, label_new_tid, drop_single_points, feature=TIME_TO_PREV, max_between_adj_points=max_time_between_adj_points, all=False ) if not inplace: return move_data
[docs]@timer_decorator def by_max_speed( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', label_id: str = TRAJ_ID, max_speed_between_adj_points: float = 50.0, drop_single_points: bool = True, label_new_tid: str = TID_SPEED, inplace: bool = False, ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Splits the trajectories into segments based on a maximum speed. Parameters ---------- move_data : dataframe. The input trajectory data. label_id : str, optional Indicates the label of the id column in the users dataframe, by default TRAJ_ID max_speed_between_adj_points : float, optional Specify the maximum speed between two adjacent points, by default 50 drop_single_points : boolean, optional If set to True, drops the trajectories with only one point, by default True label_new_tid : str, optional The label of the column containing the ids of the formed segments, by default TID_SPEED Is the new splitted id. inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the aditional features: label_segment, that indicates the trajectory segment to which the point belongs to Note ---- Speed features must be updated after split. """ if not inplace: move_data = move_data.copy() logger.debug( 'Split trajectories by max_speed_between_adj_points: {}'.format( max_speed_between_adj_points ) ) if SPEED_TO_PREV not in move_data: move_data.generate_dist_time_speed_features() move_data = _filter_by( move_data, label_id, label_new_tid, drop_single_points, feature=SPEED_TO_PREV, max_between_adj_points=max_speed_between_adj_points, all=False ) if not inplace: return move_data