"""
Semantic operations.
outliers
create_or_update_out_of_the_bbox,
create_or_update_gps_deactivated_signal,
create_or_update_gps_jump,
create_or_update_short_trajectory,
create_or_update_gps_block_signal,
filter_block_signal_by_repeated_amount_of_points,
filter_block_signal_by_time,
filter_longer_time_to_stop_segment_by_id
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy as np
from pandas import DataFrame
from pymove.preprocessing import filters, segmentation, stay_point_detection
from pymove.utils.constants import (
BLOCK,
DEACTIVATED,
DIST_PREV_TO_NEXT,
DIST_TO_NEXT,
DIST_TO_PREV,
JUMP,
OUT_BBOX,
OUTLIER,
SEGMENT_STOP,
SHORT,
TID_PART,
TIME_TO_PREV,
TRAJ_ID,
)
from pymove.utils.log import logger, timer_decorator
if TYPE_CHECKING:
from pymove.core.dask import DaskMoveDataFrame
from pymove.core.pandas import PandasMoveDataFrame
def _end_create_operation(
move_data: DataFrame, new_label: str, inplace: bool
) -> DataFrame | None:
"""
Returns the dataframe after create operation.
Parameters
----------
move_data: dataframe
The input trajectories data.
new_label: string
The name of the new feature with detected deactivated signals.
inplace : boolean
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned.
Returns
-------
DataFrame
DataFrame with the additional features or None
"""
logger.debug(move_data[new_label].value_counts())
if not inplace:
return move_data
def _process_simple_filter(
move_data: DataFrame, new_label: str, feature: str, value: float, inplace: bool
) -> DataFrame | None:
"""
Processes create operation with simple filter.
Parameters
----------
move_data: dataframe
The input trajectories data.
new_label: string
The name of the new feature with detected deactivated signals.
feature: string
Feature column to compare
value: float
Value to compare feature
inplace : boolean
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned.
Returns
-------
DataFrame
DataFrame with the additional features or None
"""
move_data[new_label] = False
filter_ = move_data[feature] >= value
idx_start = move_data[filter_].index
idx_end = idx_start - np.full(len(idx_start), 1, dtype=np.int32)
idx = np.concatenate([idx_start, idx_end], axis=0)
move_data.at[idx, new_label] = True
return _end_create_operation(
move_data, new_label, inplace
)
[docs]@timer_decorator
def outliers(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
jump_coefficient: float = 3.0,
threshold: float = 1,
new_label: str = OUTLIER,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Create or update a boolean feature to detect outliers.
Parameters
----------
move_data : dataframe
The input trajectory data
jump_coefficient : float, optional
by default 3
threshold : float, optional
Minimum value that the distance features must have
in order to be considered outliers, by default 1
new_label: string, optional
The name of the new feature with detected points out of the bbox,
by default OUTLIER
inplace : bool, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned, by default False
Returns
-------
DataFrame
Returns a dataframe with the trajectories outliers or None
"""
if not inplace:
move_data = move_data.copy()
if DIST_TO_PREV not in move_data:
move_data.generate_dist_features()
if move_data.index.name is not None:
logger.debug('...Reset index for filtering\n')
move_data.reset_index(inplace=True)
if (
DIST_TO_PREV in move_data
and DIST_TO_NEXT
and DIST_PREV_TO_NEXT in move_data
):
jump = jump_coefficient * move_data[DIST_PREV_TO_NEXT]
filter_ = (
(move_data[DIST_TO_NEXT] > threshold)
& (move_data[DIST_TO_PREV] > threshold)
& (move_data[DIST_PREV_TO_NEXT] > threshold)
& (jump < move_data[DIST_TO_NEXT])
& (jump < move_data[DIST_TO_PREV])
)
move_data[new_label] = filter_
else:
logger.warning('...Distances features were not created')
if not inplace:
return move_data
[docs]@timer_decorator
def create_or_update_out_of_the_bbox(
move_data: DataFrame,
bbox: tuple[int, int, int, int],
new_label: str = OUT_BBOX,
inplace: bool = False
) -> DataFrame | None:
"""
Create or update a boolean feature to detect points out of the bbox.
Parameters
----------
move_data: dataframe
The input trajectories data.
bbox : tuple
Tuple of 4 elements, containing the minimum and maximum values
of latitude and longitude of the bounding box.
new_label: string, optional
The name of the new feature with detected points out of the bbox,
by default OUT_BBOX
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
Returns dataframe with a boolean feature with detected
points out of the bbox, or None
Raises
------
ValueError
If feature generation fails
"""
if not inplace:
move_data = move_data.copy()
logger.debug('\nCreate or update boolean feature to detect points out of the bbox')
filtered_ = filters.by_bbox(move_data, bbox, filter_out=True)
if filtered_ is None:
raise ValueError('Filter bbox failed!')
logger.debug('...Creating a new label named as %s' % new_label)
move_data[new_label] = False
if filtered_.shape[0] > 0:
logger.debug('...Setting % as True\n' % new_label)
move_data.at[filtered_.index, new_label] = True
return _end_create_operation(
move_data, new_label, inplace
)
[docs]@timer_decorator
def create_or_update_gps_deactivated_signal(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
max_time_between_adj_points: float = 7200,
new_label: str = DEACTIVATED,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Creates a new feature that inform if point invalid.
If the max time between adjacent points is equal or
less than max_time_between_adj_points.
Parameters
----------
move_data: dataframe
The input trajectories data.
max_time_between_adj_points: float, optional
The max time between adjacent points, by default 7200
new_label: string, optional
The name of the new feature with detected deactivated signals,
by default DEACTIVATED
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
DataFrame with the additional features or None
'time_to_prev', 'time_to_next', 'time_prev_to_next', 'deactivate_signal'
"""
if not inplace:
move_data = move_data.copy()
message = 'Create or update deactivated signal if time max > %s seconds\n'
logger.debug(message % max_time_between_adj_points)
move_data.generate_time_features()
return _process_simple_filter(
move_data,
new_label,
TIME_TO_PREV,
max_time_between_adj_points,
inplace
)
[docs]@timer_decorator
def create_or_update_gps_jump(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
max_dist_between_adj_points: float = 3000,
new_label: str = JUMP,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Creates a new feature that inform if point is a gps jump.
A jump is defined if the maximum distance between adjacent points
is greater than max_dist_between_adj_points.
Parameters
----------
move_data: dataframe
The input trajectories data.
max_dist_between_adj_points: float, optional
The maximum distance between adjacent points, by default 3000
new_label: string, optional
The name of the new feature with detected deactivated signals, by default GPS_JUMP
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
DataFrame with the additional features or None
'dist_to_prev', 'dist_to_next', 'dist_prev_to_next', 'jump'
"""
if not inplace:
move_data = move_data.copy()
message = 'Create or update jump if dist max > %s meters\n'
logger.debug(message % max_dist_between_adj_points)
move_data.generate_dist_features()
return _process_simple_filter(
move_data,
new_label,
DIST_TO_PREV,
max_dist_between_adj_points,
inplace
)
[docs]@timer_decorator
def create_or_update_short_trajectory(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
max_dist_between_adj_points: float = 3000,
max_time_between_adj_points: float = 7200,
max_speed_between_adj_points: float = 50,
k_segment_max: int = 50,
label_tid: str = TID_PART,
new_label: str = SHORT,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Creates a new feature that inform if point belongs to a short trajectory.
Parameters
----------
move_data : dataframe
The input trajectory data
max_dist_between_adj_points : float, optional
Specify the maximum distance a point should have from
the previous point, in order not to be dropped, by default 3000
max_time_between_adj_points : float, optional
Specify the maximum travel time between two adjacent points, by default 7200
max_speed_between_adj_points : float, optional
Specify the maximum speed of travel between two adjacent points, by default 50
k_segment_max: int, optional
Specify the maximum number of segments in the trajectory, by default 50
label_tid: str, optional
The label of the column containing the ids of the formed segments,
by default TID_PART
new_label: str, optional
The name of the new feature with short trajectories, by default SHORT
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
DataFrame with the aditional features or None
'dist_to_prev', 'time_to_prev', 'speed_to_prev', 'tid_part', 'short_traj'
"""
if not inplace:
move_data = move_data.copy()
logger.debug('\nCreate or update short trajectories...')
segmentation.by_dist_time_speed(
move_data,
max_dist_between_adj_points=max_dist_between_adj_points,
max_time_between_adj_points=max_time_between_adj_points,
max_speed_between_adj_points=max_speed_between_adj_points,
label_new_tid=label_tid,
inplace=True
)
move_data[new_label] = False
df_count_tid = move_data.groupby(by=label_tid).size()
filter_ = df_count_tid <= k_segment_max
idx = df_count_tid[filter_].index
move_data.loc[move_data[label_tid].isin(idx), new_label] = True
return _end_create_operation(
move_data, new_label, inplace
)
[docs]@timer_decorator
def create_or_update_gps_block_signal(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
max_time_stop: float = 7200,
new_label: str = BLOCK,
label_tid: str = TID_PART,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Creates a new feature that inform segments with periods without moving.
Parameters
----------
move_data: dataFrame
The input trajectories data.
max_time_stop: float, optional
Maximum time allowed with speed 0, by default 7200
new_label: string, optional
The name of the new feature with detected deactivated signals, by default BLOCK
label_tid : str, optional
The label of the column containing the ids of the formed segments,
by default TID_PART
Is the new slitted id.
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
DataFrame with the additional features or None
'dist_to_prev', 'time_to_prev', 'speed_to_prev',
'tid_dist', 'block_signal'
"""
if not inplace:
move_data = move_data.copy()
message = 'Create or update block_signal if max time stop > %s seconds\n'
logger.debug(message % max_time_stop)
segmentation.by_max_dist(
move_data,
max_dist_between_adj_points=0.0,
label_new_tid=label_tid,
inplace=True
)
logger.debug('Updating dist time speed values')
move_data.generate_dist_time_speed_features(label_id=label_tid)
move_data[new_label] = False
df_agg_tid = move_data.groupby(by=label_tid).agg({TIME_TO_PREV: 'sum'})
filter_ = df_agg_tid[TIME_TO_PREV] >= max_time_stop
idx = df_agg_tid[filter_].index
move_data.loc[move_data[label_tid].isin(idx), new_label] = True
return _end_create_operation(
move_data, new_label, inplace
)
[docs]@timer_decorator
def filter_block_signal_by_repeated_amount_of_points(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
amount_max_of_points_stop: float = 30.0,
max_time_stop: float = 7200,
filter_out: bool = False,
label_tid: str = TID_PART,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Filters from dataframe points with blocked signal by amount of points.
Parameters
----------
move_data: dataFrame
The input trajectories data.
amount_max_of_points_stop: float, optional
Maximum number of stopped points, by default 30
max_time_stop: float, optional
Maximum time allowed with speed 0, by default 7200
filter_out: boolean, optional
If set to True, it will return trajectory points with blocked signal,
by default False
label_tid : str, optional
The label of the column containing the ids of the formed segments,
by default TID_PART
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
Filtered DataFrame with the additional features or None
'dist_to_prev', 'time_to_prev', 'speed_to_prev',
'tid_dist', 'block_signal'
"""
if not inplace:
move_data = move_data.copy()
if BLOCK not in move_data:
create_or_update_gps_block_signal(
move_data, max_time_stop, label_tid=label_tid, inplace=True
)
df_count_tid = move_data.groupby(by=[label_tid]).sum()
filter_ = df_count_tid[BLOCK] > amount_max_of_points_stop
if filter_out:
idx = df_count_tid[~filter_].index
else:
idx = df_count_tid[filter_].index
filter_ = move_data[move_data[label_tid].isin(idx)].index
move_data.drop(index=filter_, inplace=True)
if not inplace:
return move_data
[docs]@timer_decorator
def filter_block_signal_by_time(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
max_time_stop: float = 7200,
filter_out: bool = False,
label_tid: str = TID_PART,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Filters from dataframe points with blocked signal by time.
Parameters
----------
move_data: dataFrame
The input trajectories data.
max_time_stop: float, optional
Maximum time allowed with speed 0, by default 7200
filter_out: boolean, optional
If set to True, it will return trajectory points with blocked signal,
by default False
label_tid : str, optional
The label of the column containing the ids of the formed segments,
by default TID_PART
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
Filtered DataFrame with the additional features or None
'dist_to_prev', 'time_to_prev', 'speed_to_prev',
'tid_dist', 'block_signal'
"""
if not inplace:
move_data = move_data.copy()
if BLOCK not in move_data:
create_or_update_gps_block_signal(
move_data, max_time_stop, label_tid=label_tid, inplace=True
)
df_agg_tid = move_data.groupby(by=label_tid).agg(
{TIME_TO_PREV: 'sum', BLOCK: 'sum'}
)
filter_ = (df_agg_tid[TIME_TO_PREV] > max_time_stop) & (df_agg_tid[BLOCK] > 0)
if filter_out:
idx = df_agg_tid[~filter_].index
else:
idx = df_agg_tid[filter_].index
filter_ = move_data[move_data[label_tid].isin(idx)].index
move_data.drop(index=filter_, inplace=True)
if not inplace:
return move_data
[docs]@timer_decorator
def filter_longer_time_to_stop_segment_by_id(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
dist_radius: float = 30,
time_radius: float = 900,
label_id: str = TRAJ_ID,
label_segment_stop: str = SEGMENT_STOP,
filter_out: bool = False,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Filters from dataframe segment with longest stop time.
Parameters
----------
move_data: dataFrame
The input trajectories data.
dist_radius : float, optional
The dist_radius defines the distance used in the segmentation, by default 30
time_radius : float, optional
The time_radius used to determine if a segment is a stop, by default 30
If the user stayed in the segment for a time
greater than time_radius, than the segment is a stop.
label_tid : str, optional
The label of the column containing the ids of the formed segments,
by default TRAJ_ID
label_segment_stop: str, optional
by default 'segment_stop'
filter_out: boolean, optional
If set to True, it will return trajectory points with longer time, by default True
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned,
by default False
Returns
-------
DataFrame
Filtered DataFrame with the additional features or None
'dist_to_prev', 'time_to_prev', 'speed_to_prev',
'tid_dist', 'block_signal'
"""
if not inplace:
move_data = move_data.copy()
if label_segment_stop not in move_data:
stay_point_detection.create_or_update_move_stop_by_dist_time(
move_data, dist_radius, time_radius, inplace=True
)
df_agg_id_stop = move_data.groupby(
[label_id, label_segment_stop], as_index=False
).agg({TIME_TO_PREV: 'sum'})
filter_ = df_agg_id_stop.groupby(
[label_id], as_index=False
).idxmax()[TIME_TO_PREV]
if filter_out:
segments = df_agg_id_stop.loc[~df_agg_id_stop.index.isin(filter_)]
else:
segments = df_agg_id_stop.loc[df_agg_id_stop.index.isin(filter_)]
segments = segments[label_segment_stop]
filter_ = move_data[move_data[label_segment_stop].isin(segments)].index
move_data.drop(index=filter_, inplace=True)
if not inplace:
return move_data