"""
Filtering operations.
get_bbox_by_radius,
by_bbox,
by_datetime,
by_label,
by_id,
by_tid,
clean_consecutive_duplicates,
clean_gps_jumps_by_distance,
clean_gps_nearby_points_by_distances,
clean_gps_nearby_points_by_speed,
clean_gps_speed_max_radius,
clean_trajectories_with_few_points,
clean_trajectories_short_and_few_points,
clean_id_by_time_max
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable
import numpy as np
from pandas import DataFrame
from pymove.semantic.semantic import outliers
from pymove.utils.constants import (
DATETIME,
DIST_TO_PREV,
LATITUDE,
LONGITUDE,
OUTLIER,
SPEED_TO_PREV,
TID,
TIME_TO_PREV,
TRAJ_ID,
)
from pymove.utils.log import logger
if TYPE_CHECKING:
from pymove.core.dask import DaskMoveDataFrame
from pymove.core.pandas import PandasMoveDataFrame
[docs]def get_bbox_by_radius(
coordinates: tuple[float, float], radius: float = 1000
) -> tuple[float, float, float, float]:
"""
Defines minimum and maximum coordinates, given a distance radius from a point.
Parameters
----------
coords : tuple (lat, lon)
The coordinates of point
radius: float, optional (1000 by default)
Returns
-------
array
coordinates min and max of the bbox
References
----------
https://mathmesquita.me/2017/01/16/filtrando-localizacao-em-um-raio.html
"""
earth_radius = 6371000
r = radius / earth_radius
lat, lon = np.radians(coordinates)
latmin = lat - r
latmax = lat + r
delta_lon = np.arcsin(np.sin(r) / np.cos(lat))
lonmin = lon - delta_lon
lonmax = lon + delta_lon
return tuple(np.rad2deg([latmin, lonmin, latmax, lonmax])) # type: ignore
[docs]def by_bbox(
move_data: DataFrame,
bbox: tuple[float, float, float, float],
filter_out: bool = False,
inplace: bool = False
) -> DataFrame | None:
"""
Filters points of the trajectories according to specified bounding box.
Parameters
----------
move_data : dataframe
The input trajectories data
bbox : tuple
Tuple of 4 elements, containing the minimum and maximum values
of latitude and longitude of the bounding box.
filter_out : boolean, optional
If set to false the function will return the trajectories points
within the bounding box, and the points outside otherwise, by default False
inplace : boolean, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned, by default False
Returns
-------
DataFrame
Returns dataframe with trajectories points filtered by bounding box or None
"""
filter_ = (
(move_data[LATITUDE] >= bbox[0])
& (move_data[LONGITUDE] >= bbox[1])
& (move_data[LATITUDE] <= bbox[2])
& (move_data[LONGITUDE] <= bbox[3])
)
if filter_out:
filter_ = ~filter_
return move_data.drop(index=move_data[~filter_].index, inplace=inplace)
[docs]def by_datetime(
move_data: DataFrame,
start_datetime: str | None = None,
end_datetime: str | None = None,
filter_out: bool = False,
inplace: bool = False,
) -> DataFrame | None:
"""
Filters trajectories points according to specified time range.
Parameters
----------
move_data : dataframe
The input trajectory data
start_datetime : str
The start date and time (Datetime format) of the time range, by default None
end_datetime : str
The end date and time (Datetime format) of the time range, by default None
filter_out : bool, optional
If set to true, the function will return the points of
the trajectories with timestamp outside the time range.
The points whithin the time range will be return if filter_out is False.
by default False
inplace : bool, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned, by default False
Returns
-------
DataFrame
Returns dataframe with trajectories points filtered by time range or None
"""
if start_datetime is not None and end_datetime is not None:
filter_ = (
(move_data[DATETIME] >= start_datetime)
& (move_data[DATETIME] <= end_datetime)
)
elif end_datetime is not None:
filter_ = move_data[DATETIME] <= end_datetime
else:
filter_ = move_data[DATETIME] >= start_datetime
if filter_out:
filter_ = ~filter_
return move_data.drop(index=move_data[~filter_].index, inplace=inplace)
[docs]def by_label(
move_data: DataFrame,
value: Any,
label_name: str,
filter_out: bool = False,
inplace: bool = False
) -> DataFrame | None:
"""
Filters trajectories points according to specified value and column label.
Parameters
----------
move_data : dataframe
The input trajectory data
value : The value to be use to filter the trajectories
Specifies the value used to filter the trajectories points
label_name : str
Specifies the label of the column used in the filtering
filter_out : bool, optional
If set to true, the function will return the points of
the trajectories with timestamp outside the time range.
The points whithin the time range will be return if filter_out is False.
by default False
inplace : bool, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned, by default False
Returns
-------
DataFrame
Returns dataframe with trajectories points filtered by label or None
"""
filter_ = move_data[label_name] == value
if filter_out:
filter_ = ~filter_
return move_data.drop(index=move_data[~filter_].index, inplace=inplace)
[docs]def by_id(
move_data: DataFrame,
id_: int | None = None,
label_id: str = TRAJ_ID,
filter_out: bool = False,
inplace: bool = False
) -> DataFrame | None:
"""
Filters trajectories points according to specified trajectory id.
Parameters
----------
move_data : dataframe
The input trajectory data
id_ : int
Specifies the number of the id used to filter the trajectories points
label_id : str, optional
The label of the column which contains the id of the trajectories,
by default TRAJ_ID
filter_out : bool, optional
If set to true, the function will return the points of
the trajectories with timestamp outside the time range.
The points whithin the time range will be return if filter_out is False.
by default False
inplace : bool, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned, by default False
Returns
-------
DataFrame
Returns dataframe with trajectories points filtered by id or None
"""
return by_label(move_data, id_, label_id, filter_out, inplace)
[docs]def by_tid(
move_data: DataFrame,
tid_: str | None = None,
filter_out: bool = False,
inplace: bool = False
) -> DataFrame | None:
"""
Filters trajectories points according to a specified trajectory tid.
Parameters
----------
move_data : dataframe
The input trajectory data
tid_ : str
Specifies the number of the tid used to filter the trajectories points
label_tid : str, optional
The label of the column in the user dataframe which contains
the tid of the trajectories, by default None
filter_out : bool, optional
If set to true, the function will return the points of
the trajectories with timestamp outside the time range.
The points whithin the time range will be return if filter_out is False.
by default False
inplace : bool, optional
if set to true the original dataframe will be altered to contain
the result of the filtering, otherwise a copy will be returned, by default False
Returns
-------
DataFrame
Returns a dataframe with trajectories points filtered or None
"""
return by_label(move_data, tid_, TID, filter_out, inplace)
[docs]def clean_consecutive_duplicates(
move_data: DataFrame,
subset: int | str | None = None,
keep: str | bool = 'first',
inplace: bool = False
) -> DataFrame | None:
"""
Removes consecutive duplicate rows of the Dataframe.
Optionally only certain columns can be consider.
Parameters
----------
move_data : dataframe
The input trajectory data
subset : Array of str, optional
Specifies Column label or sequence of labels, considered for
identifying duplicates, by default None
keep : 'first', 'last', optional
If keep is set as first, all the duplicates except for
the first occurrence will be dropped.
On the other hand if set to last, all duplicates except for
the last occurrence will be dropped.
If set to False, all duplicates are dropped.
by default 'first'
inplace : boolean, optional
if set to true the original dataframe will be altered,
the duplicates will be dropped in place,
otherwise a copy will be returned, by default False
Returns
-------
DataFrame
The filtered trajectories points without consecutive duplicates or None
"""
if keep == 'first':
n = 1
else:
n = -1
if subset is None:
filter_ = (move_data.shift(n) != move_data).any(axis=1)
else:
filter_ = (move_data[subset].shift(n) != move_data[subset]).any(axis=1)
return move_data.drop(index=move_data[~filter_].index, inplace=inplace)
def _filter_single_by_max(move_data: DataFrame, **kwargs):
"""
Filters from a dataframe rows with features below value.
Parameters
----------
move_data : dataframe
Dataframe to be filtered.
**kwargs : arguments
- arg1 : feature
- arg2 : value
Returns
-------
DataFrame
Filtered dataframe.
"""
return move_data[move_data[kwargs['arg1']] <= kwargs['arg2']]
def _filter_speed_max_radius(move_data: DataFrame, **kwargs):
"""
Filters from a dataframe rows with current or previous row features exceeding value.
Parameters
----------
move_data : dataframe
Dataframe to be filtered.
**kwargs : arguments
- arg1 : feature
- arg2 : value
Returns
-------
DataFrame
Filtered dataframe.
"""
filter_ = (
(np.nan_to_num(move_data[kwargs['arg1']].shift(1)) > kwargs['arg2'])
| (np.nan_to_num(move_data[kwargs['arg1']]) > kwargs['arg2'])
)
return move_data[filter_]
def _filter_data(move_data: DataFrame, f: Callable, kwargs: dict):
"""
Filter the dataframe using condition from given function.
Parameters
----------
move_data : dataframe
Dataframe to be filtered.
f : function
Filtering function
**kwargs : arguments
- arg1 : feature
- arg2 : value
- outliers : special behavior if cleaning by outliers
Returns
-------
dataframe
Filtered dataframe.
int
Number of rows to be dropped
"""
if kwargs['outliers']:
filter_data_points = f(
move_data,
jump_coefficient=kwargs['arg1'],
threshold=kwargs['arg2'],
inplace=False
)
filter_data_points = filter_data_points[filter_data_points[OUTLIER]]
else:
filter_data_points = f(
move_data,
arg1=kwargs['arg1'],
arg2=kwargs['arg2'],
inplace=False
)
rows_to_drop = filter_data_points.shape[0]
return filter_data_points, rows_to_drop
def _clean_gps(move_data: DataFrame, f: Callable, **kwargs):
"""
Cleans gps points from a dataframe using condition from given function.
Parameters
----------
move_data : dataframe
Dataframe to be filtered.
f : function
Filtering function
**kwargs : arguments
- arg1 : feature
- arg2 : value
- outliers : special behavior if cleaning by outliers
Returns
-------
dataframe
Filtered dataframe.
"""
if move_data.index.name is not None:
logger.debug('...Reset index for filtering\n')
move_data.reset_index(inplace=True)
filter_data_points, rows_to_drop = _filter_data(move_data, f, kwargs)
sum_drop = 0
while rows_to_drop > 0:
logger.debug('...Dropping %s rows of gps points\n' % rows_to_drop)
shape_before = move_data.shape[0]
move_data.drop(index=filter_data_points.index, inplace=True)
sum_drop = sum_drop + rows_to_drop
logger.debug(
'...Rows before: %s, Rows after:%s, Sum drop:%s\n'
% (shape_before, move_data.shape[0], sum_drop)
)
filter_data_points, rows_to_drop = _filter_data(move_data, f, kwargs)
logger.debug('%s GPS points were dropped' % sum_drop)
return move_data
[docs]def clean_gps_jumps_by_distance(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_id: str = TRAJ_ID,
jump_coefficient: float = 3.0,
threshold: float = 1,
label_dtype: Callable = np.float64,
inplace: bool = False,
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Removes the trajectories points that are outliers from the dataframe.
Parameters
----------
move_data : dataframe
The input trajectory data
label_id : str, optional
Indicates the label of the id column in the user dataframe, by default TRAJ_ID
jump_coefficient : float, optional
by default 3
threshold : float, optional
Minimum value that the distance features must have
in order to be considered outliers, by default 1
label_dtype : type, optional
Represents column id type, by default np.float64.
inplace : boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, by default False
Returns
-------
DataFrame
The filtered trajectories without the gps jumps or None
"""
if not inplace:
move_data = move_data.copy()
if DIST_TO_PREV not in move_data:
move_data.generate_dist_features(
label_id=label_id, label_dtype=label_dtype
)
logger.debug(
'\nCleaning gps jumps by distance to jump_coefficient %s...\n'
% jump_coefficient
)
move_data = _clean_gps(
move_data,
outliers,
arg1=jump_coefficient,
arg2=threshold,
outliers=True
)
if not inplace:
return move_data
[docs]def clean_gps_nearby_points_by_distances(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_id: str = TRAJ_ID,
radius_area: float = 10.0,
label_dtype: Callable = np.float64,
inplace: bool = False,
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Removes points from the trajectories with smaller distance from the point before.
Parameters
----------
move_data : dataframe
The input trajectory data
label_id : str, optional
Indicates the label of the id column in the user dataframe, by default TRAJ_ID
radius_area : float, optional
Species the minimum distance a point must have to it"srs previous point
in order not to be dropped, by default 10
label_dtype : type, optional
Represents column id type, ,y default np.float64.
inplace : boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, be default False
Returns
-------
DataFrame
The filtered trajectories without the gps nearby points by distance or None
"""
if not inplace:
move_data = move_data.copy()
if DIST_TO_PREV not in move_data:
move_data.generate_dist_features(
label_id=label_id, label_dtype=label_dtype
)
logger.debug(
'\nCleaning gps points from radius of %s meters\n'
% radius_area
)
move_data = _clean_gps(
move_data,
_filter_single_by_max,
arg1=DIST_TO_PREV,
arg2=radius_area,
outliers=False
)
if not inplace:
return move_data
[docs]def clean_gps_nearby_points_by_speed(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_id: str = TRAJ_ID,
speed_radius: float = 0.0,
label_dtype: Callable = np.float64,
inplace: bool = False,
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Removes points from the trajectories with smaller speed of travel.
Parameters
----------
move_data : dataframe
The input trajectory data
label_id : str, optional
Indicates the label of the id column in the user dataframe, be defalt TRAJ_ID
speed_radius : float, optional
Species the minimum speed a point must have from it"srs previous point,
in order not to be dropped, by default 0
label_dtype : type, optional
Represents column id type, by default np.float64.
inplace : boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, by default False
Returns
-------
DataFrame
The filtered trajectories without the gps nearby points by speed or None
"""
if not inplace:
move_data = move_data.copy()
if SPEED_TO_PREV not in move_data:
move_data.generate_dist_time_speed_features(
label_id=label_id, label_dtype=label_dtype
)
logger.debug(
'\nCleaning gps points using %s speed radius\n'
% speed_radius
)
move_data = _clean_gps(
move_data,
_filter_single_by_max,
arg1=SPEED_TO_PREV,
arg2=speed_radius,
outliers=False
)
if not inplace:
return move_data
[docs]def clean_gps_speed_max_radius(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_id: str = TRAJ_ID,
speed_max: float = 50.0,
label_dtype: Callable = np.float64,
inplace: bool = False,
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Removes trajectories points with higher speed.
Given any point p of the trajectory, the point will
be removed if one of the following happens: if the travel speed from the
point before p to p is greater than the max value of speed between adjacent
points set by the user. Or the travel speed between point p and the next
point is greater than the value set by the user. When the cleaning is done,
the function will update the time and distance features in the dataframe and
will call itself again. The function will finish processing when it can no
longer find points disrespecting the limit of speed.
Parameters
----------
move_data : dataframe
The input trajectory data
label_id : str, optional
Indicates the label of the id column in the user dataframe, by default TRAJ_ID
speed_max : float, optional
Indicates the maximum value a point speed_to_prev and speed_to_next
should have, in order not to be dropped, by default 50
label_dtype : type, optional
Represents column id type, by default np.float64.
inplace : boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, by default False
Returns
-------
DataFrame
The filtered trajectories without the gps nearby points or None
"""
if not inplace:
move_data = move_data.copy()
if SPEED_TO_PREV not in move_data:
move_data.generate_dist_time_speed_features(
label_id=label_id, label_dtype=label_dtype
)
logger.debug(
'\nClean gps points with speed max > %s meters by seconds'
% speed_max
)
move_data = _clean_gps(
move_data,
_filter_speed_max_radius,
arg1=SPEED_TO_PREV,
arg2=speed_max,
outliers=False
)
if not inplace:
return move_data
[docs]def clean_trajectories_with_few_points(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_tid: str = TID,
min_points_per_trajectory: int = 2,
inplace: bool = False
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Removes from the given dataframe, trajectories with fewer points.
Parameters
----------
move_data : dataframe
The input trajectory data
label_tid : str, optional
The label of the column which contains the tid of the trajectories, by default TID
min_points_per_trajectory: int, optional
Specifies the minimum number of points a trajectory must have
in order not to be dropped, by default 2
inplace : boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, by default False
Returns
-------
DataFrame
The filtered trajectories without the minimum number of gps points or None
Raises
------
KeyError
If the label feature is not in the dataframe
"""
if not inplace:
move_data = move_data.copy()
if label_tid not in move_data:
raise KeyError('%s not in dataframe' % label_tid)
logger.debug(
'\nCleaning gps points from trajectories of fewer than %s points\n'
% min_points_per_trajectory
)
if move_data.index.name is not None:
logger.debug('\n...Reset index for filtering\n')
move_data.reset_index(inplace=True)
move_datacount_tid = move_data.groupby(by=label_tid).size()
filter_ = move_datacount_tid < min_points_per_trajectory
tids_with_few_points = move_datacount_tid[filter_].index
shape_before_drop = move_data.shape
idx = move_data[move_data[label_tid].isin(tids_with_few_points)].index
if idx.shape[0] > 0:
logger.debug(
'\n...There are %s ids with few points'
% tids_with_few_points.shape[0]
)
logger.debug(
'\n...Tids before drop: %s'
% move_data[label_tid].unique().shape[0]
)
move_data.drop(index=idx, inplace=True)
logger.debug(
'\n...Tids after drop: %s'
% move_data[label_tid].unique().shape[0]
)
logger.debug(
'\n...Shape - before drop: %s - after drop: %s'
% (shape_before_drop, move_data.shape)
)
if not inplace:
return move_data
[docs]def clean_trajectories_short_and_few_points(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_id: str = TID,
min_trajectory_distance: float = 100,
min_points_per_trajectory: int = 2,
label_dtype: Callable = np.float64,
inplace: bool = False,
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Eliminates from the given dataframe trajectories with fewer points and shorter length.
Parameters
----------
move_data : dataframe
The input trajectory data
label_id : str, optional
The label of the column which contains the tid of the trajectories, by default TID
min_trajectory_distance: float, optional
Specifies the minimun length a trajectory must have
in order not to be dropped, by default 100
min_points_per_trajectory: int, optional
Specifies the minimun number of points a trajectory must have
in order not to be dropped, by default 2
label_dtype : type, optional
Represents column id type, by default np.float64.
inplace: boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, by default False
Returns
-------
DataFrame
The filtered trajectories with the minimum gps points and distance or None
Notes
-----
remove_tids_with_few_points must be performed before updating features.
"""
if not inplace:
move_data = move_data.copy()
logger.debug('\nRemove short trajectories...')
clean_trajectories_with_few_points(
move_data, label_id, min_points_per_trajectory, inplace=True
)
if DIST_TO_PREV not in move_data:
move_data.generate_dist_features(
label_id=label_id, label_dtype=label_dtype
)
logger.debug('\n...Dropping unnecessary trajectories...')
if move_data.index.name is not None:
logger.debug('reseting index')
move_data.reset_index(inplace=True)
move_dataagg_tid = move_data.groupby(by=label_id).agg(
{DIST_TO_PREV: 'sum'}
)
filter_ = move_dataagg_tid[DIST_TO_PREV] < min_trajectory_distance
tid_selection = move_dataagg_tid[filter_].index
logger.debug(
'\n...short trajectories and trajectories with a minimum distance (%s): %s'
% (move_dataagg_tid.shape[0], min_trajectory_distance)
)
logger.debug('\n...There are %s tid do drop' % tid_selection.shape[0])
shape_before_drop = move_data.shape
idx = move_data[move_data[label_id].isin(tid_selection)].index
if idx.shape[0] > 0:
tids_before_drop = move_data[label_id].unique().shape[0]
logger.debug(
'\n...Tids - before drop: %s - after drop: %s'
% (tids_before_drop, move_data[label_id].unique().shape[0])
)
move_data.drop(index=idx, inplace=True)
logger.debug(
'\n...Shape - before drop: %s - after drop: %s'
% (shape_before_drop, move_data.shape)
)
if not inplace:
return move_data
[docs]def clean_id_by_time_max(
move_data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame',
label_id: str = TRAJ_ID,
time_max: float = 3600,
label_dtype: Callable = np.float64,
inplace: bool = False,
) -> 'PandasMoveDataFrame' | 'DaskMoveDataFrame' | None:
"""
Clears GPS points with time by ID greater than a user-defined limit.
Parameters
----------
move_data: dataframe.
The input data.
label_id: str, optional
The label of the column which contains the id of the trajectories,
by default TRAJ_ID
time_max: float, optional
Indicates the maximum value time a set of points with the
same id should have in order not to be dropped, by default 3600
label_dtype : type, optional
Represents column id type, by default np.float64.
inplace : boolean, optional
if set to true the operation is done in place, the original
dataframe will be altered and None is returned, by default False
Returns
-------
dataframe or None
The filtered trajectories with the maximum time.
"""
if not inplace:
move_data = move_data.copy()
if TIME_TO_PREV not in move_data:
move_data.generate_dist_time_speed_features(
label_id=label_id, label_dtype=label_dtype
)
logger.debug(
'\nClean gps points with time max by id < %s seconds'
% time_max
)
move_dataid_drop = (
move_data.groupby([label_id], as_index=False)
.agg({TIME_TO_PREV: 'sum'})
.query(f'{TIME_TO_PREV} < {time_max}')
)
logger.debug(
'...Ids total: %s\nIds to drop:%s'
% (
move_data[label_id].nunique(),
move_dataid_drop[label_id].nunique()
)
)
if move_dataid_drop.shape[0] > 0:
before_drop = move_data.shape[0]
filter_ = move_data[label_id].isin(move_dataid_drop[label_id])
idx = move_data[filter_].index
move_data.drop(idx, inplace=True)
logger.debug(
'...Rows before drop: %s\n Rows after drop: %s'
% (before_drop, move_data.shape[0])
)
if not inplace:
return move_data