Source code for pymove.utils.data_augmentation

"""
Data augmentation operations.

append_row,
generate_trajectories_df,
generate_start_feature,
generate_destiny_feature,
split_crossover,
augmentation_trajectories_df,
insert_points_in_df,
instance_crossover_augmentation

"""
from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
from pandas.core.frame import DataFrame
from pandas.core.series import Series

from pymove.utils.constants import DESTINY, START, TID, TRAJECTORY
from pymove.utils.log import progress_bar

if TYPE_CHECKING:
    from pymove.core.dask import DaskMoveDataFrame
    from pymove.core.pandas import PandasMoveDataFrame


[docs]def append_row( data: DataFrame, row: Series | None = None, columns: dict | None = None ): """ Insert a new line in the dataframe with the information passed by parameter. Parameters ---------- data : DataFrame The input trajectories data. row : Series, optional The row of a dataframe, by default None columns : dict, optional Dictionary containing the values to be added, by default None """ if row is not None: keys = row.index.tolist() data.at[data.shape[0], keys] = row.values else: if isinstance(columns, dict): keys = list(columns.keys()) values = [np.array(v).tolist() for v in list(columns.values())] data.at[data.shape[0], keys] = values
[docs]def generate_trajectories_df( data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame' ) -> DataFrame: """ Generates a dataframe with the sequence of location points of a trajectory. Parameters ---------- data : DataFrame The input trajectory data. Return ------ DataFrame DataFrame of the trajectories """ if TID not in data: data.generate_tid_based_on_id_datetime() data.reset_index(drop=True, inplace=True) tids = data[TID].unique() new_df = pd.DataFrame( columns=data.columns ) for tid in progress_bar(tids, total=len(tids)): filter_ = data[data[TID] == tid] filter_.reset_index(drop=True, inplace=True) if filter_.shape[0] > 4: values = [] for col in filter_.columns: if filter_[col].nunique() == 1: values.append(filter_.at[0, col]) else: values.append( np.array( filter_[col], dtype=type(filter_.at[0, col]) ).tolist() ) row = pd.Series(values, filter_.columns) append_row(new_df, row=row) return new_df
[docs]def generate_start_feature( data: DataFrame, label_trajectory: str = TRAJECTORY ): """ Removes the last point from the trajectory and adds it in a new column 'destiny'. Parameters ---------- data : DataFrame The input trajectory data. label_trajectory : str, optional Label of the points sequences, by default TRAJECTORY """ if START not in data: data[START] = data[label_trajectory].apply( lambda x: np.int64(x[0]) )
[docs]def generate_destiny_feature( data: DataFrame, label_trajectory: str = TRAJECTORY ): """ Removes the first point from the trajectory and adds it in a new column 'start'. Parameters ---------- data : DataFrame The input trajectory data. label_trajectory : str, optional Label of the points sequences, by default 'trajectory' """ if DESTINY not in data: data[DESTINY] = data[label_trajectory].apply( lambda x: np.int64(x[-1]) )
[docs]def split_crossover( sequence_a: list, sequence_b: list, frac: float = 0.5 ) -> tuple[list, list]: """ Divides two arrays in the indicated ratio and exchange their halves. Parameters ---------- sequence_a : list or ndarray Array any sequence_b : list or ndarray Array any frac : float, optional Represents the percentage to be exchanged, by default 0.5 Returns ------- Tuple[List, List] Arrays with the halves exchanged. """ size_a = int(len(sequence_a) * frac) size_b = int(len(sequence_b) * frac) sequence_a1 = sequence_a[:size_a] sequence_a2 = sequence_a[size_a:] sequence_b1 = sequence_b[:size_b] sequence_b2 = sequence_b[size_b:] sequence_a = np.concatenate((sequence_a1, sequence_b2)) sequence_b = np.concatenate((sequence_b1, sequence_a2)) return sequence_a, sequence_b
def _augmentation(data: DataFrame, aug_df: DataFrame, frac: float = 0.5): """ Generates new data with unobserved trajectories. Parameters ---------- data : DataFrame The input trajectories data. aug_df : DataFrame The dataframe with new trajectories frac : number, optional Represents the percentage to be exchanged, by default 0.5 """ data.reset_index(drop=True, inplace=True) for idx in range(data.shape[0] - 1): for idx_ in range(idx + 1, data.shape[0]): sequences1 = [] sequences2 = [] columns = data.columns for col in columns: if (isinstance( data.at[idx, col], list ) or isinstance( data.at[idx, col], np.ndarray )) and (isinstance( data.at[idx_, col], list ) or isinstance( data.at[idx_, col], np.ndarray )): seq1, seq2 = split_crossover( data.at[idx, col], data.at[idx_, col], frac=frac ) sequences1.append(seq1) sequences2.append(seq2) else: value1 = data.at[idx, col] value2 = data.at[idx_, col] if isinstance(value1, str) and isinstance(value2, str): sequences1.append(value1 + '_' + value2) # type: ignore sequences2.append(value2 + '_' + value1) # type: ignore else: sequences1.append(value1) sequences2.append(value2) row = pd.Series(sequences1, index=columns) append_row(aug_df, row=row) row = pd.Series(sequences2, index=columns) append_row(aug_df, row=row)
[docs]def augmentation_trajectories_df( data: 'PandasMoveDataFrame' | 'DaskMoveDataFrame', restriction: str = 'destination only', label_trajectory: str = TRAJECTORY, insert_at_df: bool = False, frac: float = 0.5, ) -> DataFrame: """ Generates new data from unobserved trajectories, given a specific restriction. By default, the algorithm uses the same route destination constraint. Parameters ---------- data : DataFrame The input trajectories data. restriction : str, optional Constraint used to generate new data, by default 'destination only' label_trajectory : str, optional Label of the points sequences, by default TRAJECTORY insert_at_df : boolean, optional Whether to return a new DataFrame, by default False If True then value of copy is ignored. frac : number, optional Represents the percentage to be exchanged, by default 0.5 Returns ------- DataFrame Dataframe with the new data generated """ if DESTINY not in data: generate_destiny_feature(data, label_trajectory=label_trajectory) if restriction == 'departure and destination': generate_start_feature(data) if insert_at_df: aug_df = data else: aug_df = pd.DataFrame(columns=data.columns) destinations = data[DESTINY].unique() for dest in progress_bar(destinations, total=len(destinations)): filter_ = data[data[DESTINY] == dest] if restriction == 'departure and destination': starts = filter_[START].unique() for st in progress_bar(starts, total=len(starts)): f_filter_ = filter_[filter_[START] == st] if f_filter_.shape[0] >= 2: _augmentation(f_filter_, aug_df, frac=frac) else: if filter_.shape[0] >= 2: _augmentation(filter_, aug_df, frac=frac) return aug_df
[docs]def insert_points_in_df(data: DataFrame, aug_df: DataFrame): """ Inserts the points of the generated trajectories to the original data sets. Parameters ---------- data : DataFrame The input trajectories data aug_df : DataFrame The data of unobserved trajectories """ for _, row in progress_bar(aug_df.iterrows(), total=aug_df.shape[0]): keys = row.index.tolist() values = row.values.tolist() row_df = pd.DataFrame() for k, v in zip(keys, values): if k in data: if isinstance(v, list) or isinstance(v, np.ndarray): row_df[k] = v for k, v in zip(keys, values): if k in data: if not isinstance(v, list) and not isinstance(v, np.ndarray): row_df[k] = v for _, row_ in row_df.iterrows(): append_row(data, row=row_)
[docs]def instance_crossover_augmentation( data: DataFrame, restriction: str = 'destination only', label_trajectory: str = TRAJECTORY, frac: float = 0.5 ): """ Generates new data from unobserved trajectories, with a specific restriction. By default, the algorithm uses the same destination constraint as the route and inserts the points on the original dataframe. Parameters ---------- data : DataFrame The input trajectories data restriction : str, optional Constraint used to generate new data, by default 'destination only' label_trajectory : str, optional Label of the points sequences, by default 'trajectory' frac : number, optional Represents the percentage to be exchanged, by default 0.5 """ traj_df = generate_trajectories_df(data) generate_destiny_feature(traj_df, label_trajectory=label_trajectory) if restriction == 'departure and destination': generate_start_feature(traj_df, label_trajectory=label_trajectory) aug_df = augmentation_trajectories_df( traj_df, restriction=restriction, frac=frac ) insert_points_in_df(data, aug_df)