Source code for pymove.semantic.semantic

"""
Semantic operations.

outliers
create_or_update_out_of_the_bbox,
create_or_update_gps_deactivated_signal,
create_or_update_gps_jump,
create_or_update_short_trajectory,
create_or_update_gps_block_signal,
filter_block_signal_by_repeated_amount_of_points,
filter_block_signal_by_time,
filter_longer_time_to_stop_segment_by_id

"""
from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np
from pandas import DataFrame

from pymove.preprocessing import filters, segmentation, stay_point_detection
from pymove.utils.constants import (
    BLOCK,
    DEACTIVATED,
    DIST_PREV_TO_NEXT,
    DIST_TO_NEXT,
    DIST_TO_PREV,
    JUMP,
    OUT_BBOX,
    OUTLIER,
    SEGMENT_STOP,
    SHORT,
    TID_PART,
    TIME_TO_PREV,
    TRAJ_ID,
)
from pymove.utils.log import logger, timer_decorator

if TYPE_CHECKING:
    from pymove.core.dask import DaskMoveDataFrame
    from pymove.core.pandas import PandasMoveDataFrame


def _end_create_operation(
    move_data: DataFrame, new_label: str, inplace: bool
) -> DataFrame | None:
    """
    Returns the dataframe after create operation.

    Parameters
    ----------
    move_data: dataframe
        The input trajectories data.
    new_label: string
        The name of the new feature with detected deactivated signals.
    inplace : boolean
        if set to true the original dataframe will be altered to contain
        the result of the filtering, otherwise a copy will be returned.

    Returns
    -------
    DataFrame
        DataFrame with the additional features or None

    """
    logger.debug(move_data[new_label].value_counts())
    if not inplace:
        return move_data


def _process_simple_filter(
    move_data: DataFrame, new_label: str, feature: str, value: float, inplace: bool
) -> DataFrame | None:
    """
    Processes create operation with simple filter.

    Parameters
    ----------
    move_data: dataframe
        The input trajectories data.
    new_label: string
        The name of the new feature with detected deactivated signals.
    feature: string
        Feature column to compare
    value: float
        Value to compare feature
    inplace : boolean
        if set to true the original dataframe will be altered to contain
        the result of the filtering, otherwise a copy will be returned.

    Returns
    -------
    DataFrame
        DataFrame with the additional features or None

    """
    move_data[new_label] = False
    filter_ = move_data[feature] >= value
    idx_start = move_data[filter_].index
    idx_end = idx_start - np.full(len(idx_start), 1, dtype=np.int32)
    idx = np.concatenate([idx_start, idx_end], axis=0)
    move_data.at[idx, new_label] = True

    return _end_create_operation(
        move_data, new_label, inplace
    )


[docs]@timer_decorator def outliers( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', jump_coefficient: float = 3.0, threshold: float = 1, new_label: str = OUTLIER, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Create or update a boolean feature to detect outliers. Parameters ---------- move_data : dataframe The input trajectory data jump_coefficient : float, optional by default 3 threshold : float, optional Minimum value that the distance features must have in order to be considered outliers, by default 1 new_label: string, optional The name of the new feature with detected points out of the bbox, by default OUTLIER inplace : bool, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame Returns a dataframe with the trajectories outliers or None """ if not inplace: move_data = move_data.copy() if DIST_TO_PREV not in move_data: move_data.generate_dist_features() if move_data.index.name is not None: logger.debug('...Reset index for filtering\n') move_data.reset_index(inplace=True) if ( DIST_TO_PREV in move_data and DIST_TO_NEXT and DIST_PREV_TO_NEXT in move_data ): jump = jump_coefficient * move_data[DIST_PREV_TO_NEXT] filter_ = ( (move_data[DIST_TO_NEXT] > threshold) & (move_data[DIST_TO_PREV] > threshold) & (move_data[DIST_PREV_TO_NEXT] > threshold) & (jump < move_data[DIST_TO_NEXT]) & (jump < move_data[DIST_TO_PREV]) ) move_data[new_label] = filter_ else: logger.warning('...Distances features were not created') if not inplace: return move_data
[docs]@timer_decorator def create_or_update_out_of_the_bbox( move_data: DataFrame, bbox: tuple[int, int, int, int], new_label: str = OUT_BBOX, inplace: bool = False ) -> DataFrame | None: """ Create or update a boolean feature to detect points out of the bbox. Parameters ---------- move_data: dataframe The input trajectories data. bbox : tuple Tuple of 4 elements, containing the minimum and maximum values of latitude and longitude of the bounding box. new_label: string, optional The name of the new feature with detected points out of the bbox, by default OUT_BBOX inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame Returns dataframe with a boolean feature with detected points out of the bbox, or None Raises ------ ValueError If feature generation fails """ if not inplace: move_data = move_data.copy() logger.debug('\nCreate or update boolean feature to detect points out of the bbox') filtered_ = filters.by_bbox(move_data, bbox, filter_out=True) if filtered_ is None: raise ValueError('Filter bbox failed!') logger.debug('...Creating a new label named as %s' % new_label) move_data[new_label] = False if filtered_.shape[0] > 0: logger.debug('...Setting % as True\n' % new_label) move_data.at[filtered_.index, new_label] = True return _end_create_operation( move_data, new_label, inplace )
[docs]@timer_decorator def create_or_update_gps_deactivated_signal( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', max_time_between_adj_points: float = 7200, new_label: str = DEACTIVATED, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Creates a new feature that inform if point invalid. If the max time between adjacent points is equal or less than max_time_between_adj_points. Parameters ---------- move_data: dataframe The input trajectories data. max_time_between_adj_points: float, optional The max time between adjacent points, by default 7200 new_label: string, optional The name of the new feature with detected deactivated signals, by default DEACTIVATED inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the additional features or None 'time_to_prev', 'time_to_next', 'time_prev_to_next', 'deactivate_signal' """ if not inplace: move_data = move_data.copy() message = 'Create or update deactivated signal if time max > %s seconds\n' logger.debug(message % max_time_between_adj_points) move_data.generate_time_features() return _process_simple_filter( move_data, new_label, TIME_TO_PREV, max_time_between_adj_points, inplace )
[docs]@timer_decorator def create_or_update_gps_jump( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', max_dist_between_adj_points: float = 3000, new_label: str = JUMP, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Creates a new feature that inform if point is a gps jump. A jump is defined if the maximum distance between adjacent points is greater than max_dist_between_adj_points. Parameters ---------- move_data: dataframe The input trajectories data. max_dist_between_adj_points: float, optional The maximum distance between adjacent points, by default 3000 new_label: string, optional The name of the new feature with detected deactivated signals, by default GPS_JUMP inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the additional features or None 'dist_to_prev', 'dist_to_next', 'dist_prev_to_next', 'jump' """ if not inplace: move_data = move_data.copy() message = 'Create or update jump if dist max > %s meters\n' logger.debug(message % max_dist_between_adj_points) move_data.generate_dist_features() return _process_simple_filter( move_data, new_label, DIST_TO_PREV, max_dist_between_adj_points, inplace )
[docs]@timer_decorator def create_or_update_short_trajectory( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', max_dist_between_adj_points: float = 3000, max_time_between_adj_points: float = 7200, max_speed_between_adj_points: float = 50, k_segment_max: int = 50, label_tid: str = TID_PART, new_label: str = SHORT, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Creates a new feature that inform if point belongs to a short trajectory. Parameters ---------- move_data : dataframe The input trajectory data max_dist_between_adj_points : float, optional Specify the maximum distance a point should have from the previous point, in order not to be dropped, by default 3000 max_time_between_adj_points : float, optional Specify the maximum travel time between two adjacent points, by default 7200 max_speed_between_adj_points : float, optional Specify the maximum speed of travel between two adjacent points, by default 50 k_segment_max: int, optional Specify the maximum number of segments in the trajectory, by default 50 label_tid: str, optional The label of the column containing the ids of the formed segments, by default TID_PART new_label: str, optional The name of the new feature with short trajectories, by default SHORT inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the aditional features or None 'dist_to_prev', 'time_to_prev', 'speed_to_prev', 'tid_part', 'short_traj' """ if not inplace: move_data = move_data.copy() logger.debug('\nCreate or update short trajectories...') segmentation.by_dist_time_speed( move_data, max_dist_between_adj_points=max_dist_between_adj_points, max_time_between_adj_points=max_time_between_adj_points, max_speed_between_adj_points=max_speed_between_adj_points, label_new_tid=label_tid, inplace=True ) move_data[new_label] = False df_count_tid = move_data.groupby(by=label_tid).size() filter_ = df_count_tid <= k_segment_max idx = df_count_tid[filter_].index move_data.loc[move_data[label_tid].isin(idx), new_label] = True return _end_create_operation( move_data, new_label, inplace )
[docs]@timer_decorator def create_or_update_gps_block_signal( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', max_time_stop: float = 7200, new_label: str = BLOCK, label_tid: str = TID_PART, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Creates a new feature that inform segments with periods without moving. Parameters ---------- move_data: dataFrame The input trajectories data. max_time_stop: float, optional Maximum time allowed with speed 0, by default 7200 new_label: string, optional The name of the new feature with detected deactivated signals, by default BLOCK label_tid : str, optional The label of the column containing the ids of the formed segments, by default TID_PART Is the new slitted id. inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame DataFrame with the additional features or None 'dist_to_prev', 'time_to_prev', 'speed_to_prev', 'tid_dist', 'block_signal' """ if not inplace: move_data = move_data.copy() message = 'Create or update block_signal if max time stop > %s seconds\n' logger.debug(message % max_time_stop) segmentation.by_max_dist( move_data, max_dist_between_adj_points=0.0, label_new_tid=label_tid, inplace=True ) logger.debug('Updating dist time speed values') move_data.generate_dist_time_speed_features(label_id=label_tid) move_data[new_label] = False df_agg_tid = move_data.groupby(by=label_tid).agg({TIME_TO_PREV: 'sum'}) filter_ = df_agg_tid[TIME_TO_PREV] >= max_time_stop idx = df_agg_tid[filter_].index move_data.loc[move_data[label_tid].isin(idx), new_label] = True return _end_create_operation( move_data, new_label, inplace )
[docs]@timer_decorator def filter_block_signal_by_repeated_amount_of_points( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', amount_max_of_points_stop: float = 30.0, max_time_stop: float = 7200, filter_out: bool = False, label_tid: str = TID_PART, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Filters from dataframe points with blocked signal by amount of points. Parameters ---------- move_data: dataFrame The input trajectories data. amount_max_of_points_stop: float, optional Maximum number of stopped points, by default 30 max_time_stop: float, optional Maximum time allowed with speed 0, by default 7200 filter_out: boolean, optional If set to True, it will return trajectory points with blocked signal, by default False label_tid : str, optional The label of the column containing the ids of the formed segments, by default TID_PART inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame Filtered DataFrame with the additional features or None 'dist_to_prev', 'time_to_prev', 'speed_to_prev', 'tid_dist', 'block_signal' """ if not inplace: move_data = move_data.copy() if BLOCK not in move_data: create_or_update_gps_block_signal( move_data, max_time_stop, label_tid=label_tid, inplace=True ) df_count_tid = move_data.groupby(by=[label_tid]).sum() filter_ = df_count_tid[BLOCK] > amount_max_of_points_stop if filter_out: idx = df_count_tid[~filter_].index else: idx = df_count_tid[filter_].index filter_ = move_data[move_data[label_tid].isin(idx)].index move_data.drop(index=filter_, inplace=True) if not inplace: return move_data
[docs]@timer_decorator def filter_block_signal_by_time( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', max_time_stop: float = 7200, filter_out: bool = False, label_tid: str = TID_PART, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Filters from dataframe points with blocked signal by time. Parameters ---------- move_data: dataFrame The input trajectories data. max_time_stop: float, optional Maximum time allowed with speed 0, by default 7200 filter_out: boolean, optional If set to True, it will return trajectory points with blocked signal, by default False label_tid : str, optional The label of the column containing the ids of the formed segments, by default TID_PART inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame Filtered DataFrame with the additional features or None 'dist_to_prev', 'time_to_prev', 'speed_to_prev', 'tid_dist', 'block_signal' """ if not inplace: move_data = move_data.copy() if BLOCK not in move_data: create_or_update_gps_block_signal( move_data, max_time_stop, label_tid=label_tid, inplace=True ) df_agg_tid = move_data.groupby(by=label_tid).agg( {TIME_TO_PREV: 'sum', BLOCK: 'sum'} ) filter_ = (df_agg_tid[TIME_TO_PREV] > max_time_stop) & (df_agg_tid[BLOCK] > 0) if filter_out: idx = df_agg_tid[~filter_].index else: idx = df_agg_tid[filter_].index filter_ = move_data[move_data[label_tid].isin(idx)].index move_data.drop(index=filter_, inplace=True) if not inplace: return move_data
[docs]@timer_decorator def filter_longer_time_to_stop_segment_by_id( move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', dist_radius: float = 30, time_radius: float = 900, label_id: str = TRAJ_ID, label_segment_stop: str = SEGMENT_STOP, filter_out: bool = False, inplace: bool = False ) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None: """ Filters from dataframe segment with longest stop time. Parameters ---------- move_data: dataFrame The input trajectories data. dist_radius : float, optional The dist_radius defines the distance used in the segmentation, by default 30 time_radius : float, optional The time_radius used to determine if a segment is a stop, by default 30 If the user stayed in the segment for a time greater than time_radius, than the segment is a stop. label_tid : str, optional The label of the column containing the ids of the formed segments, by default TRAJ_ID label_segment_stop: str, optional by default 'segment_stop' filter_out: boolean, optional If set to True, it will return trajectory points with longer time, by default True inplace : boolean, optional if set to true the original dataframe will be altered to contain the result of the filtering, otherwise a copy will be returned, by default False Returns ------- DataFrame Filtered DataFrame with the additional features or None 'dist_to_prev', 'time_to_prev', 'speed_to_prev', 'tid_dist', 'block_signal' """ if not inplace: move_data = move_data.copy() if label_segment_stop not in move_data: stay_point_detection.create_or_update_move_stop_by_dist_time( move_data, dist_radius, time_radius, inplace=True ) df_agg_id_stop = move_data.groupby( [label_id, label_segment_stop], as_index=False ).agg({TIME_TO_PREV: 'sum'}) filter_ = df_agg_id_stop.groupby( [label_id], as_index=False ).idxmax()[TIME_TO_PREV] if filter_out: segments = df_agg_id_stop.loc[~df_agg_id_stop.index.isin(filter_)] else: segments = df_agg_id_stop.loc[df_agg_id_stop.index.isin(filter_)] segments = segments[label_segment_stop] filter_ = move_data[move_data[label_segment_stop].isin(segments)].index move_data.drop(index=filter_, inplace=True) if not inplace: return move_data