Source code for

# Authors:
#     Sylvain Faure <>
#     Bertrand Maury <>
# License: GPL

import sys
import random
import scipy as sp
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from matplotlib.lines import Line2D
from matplotlib import colors
import as cm

[docs]def plot_people_according_to_current_door_distance(ifig, people, domain, axis=None, savefig=False, filename='fig.png'): """ To draw occupied cells with colors depending on current door distances Parameters ---------- ifig: int figure number people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere domain: Domain contains everything for managing the domain axis: list matplotlib axis savefig: boolean writes the figure as a png file if true filename: string png filename used to write the figure """ fig = plt.figure(ifig) fig.clf() ax1 = fig.add_subplot(111) ax1.imshow(domain.image,interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower') ax1.imshow(domain.door_distance*(people==1),interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower',alpha = 1.0,cmap='Greys') if (axis): ax1.set_xlim(axis[0],axis[1]) ax1.set_ylim(axis[2],axis[3]) ax1.set_xticks([]) ax1.set_yticks([]) ax1.axis('off') # ax1.set_xlabel('x',color='white') # ax1.set_ylabel('y',color='white') # ax1.set_title('t',color='white') # ax1.spines['left'].set_color('white') # ax1.spines['right'].set_color('white') # ax1.spines['bottom'].set_color('white') # ax1.spines['top'].set_color('white') fig.canvas.draw() if (savefig): fig.savefig(filename,dpi=150,bbox_inches='tight',pad_inches=0) plt.pause(0.01)
[docs]def plot_people_according_to_initial_door_distance(ifig, people, domain, results, axis=None, savefig=False, filename='fig.png'): """ To draw occupied cells with colors depending on initial (time=0) door \ distances Parameters ---------- ifig: int figure number people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere domain: Domain contains everything for managing the domain axis: list matplotlib axis savefig: boolean writes the figure as a png file if true filename: string png filename used to write the figure """ fig = plt.figure(ifig) fig.clf() ax1 = fig.add_subplot(111) ax1.imshow(domain.image,interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower') I0 = results[:,0,0] J0 = results[:,1,0] D0 = domain.door_distance[I0,J0] I = results[:,0,-1] J = results[:,1,-1] tmp = domain.door_distance.copy() tmp[I,J] = D0 ax1.imshow(tmp*(people==1),interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower',alpha = 1.0,cmap='Greys') if (axis): ax1.set_xlim(axis[0],axis[1]) ax1.set_ylim(axis[2],axis[3]) ax1.set_xticks([]) ax1.set_yticks([]) # ax1.set_xlabel('x',color='white') # ax1.set_ylabel('y',color='white') # ax1.set_title('t',color='white') # ax1.spines['left'].set_color('white') # ax1.spines['right'].set_color('white') # ax1.spines['bottom'].set_color('white') # ax1.spines['top'].set_color('white') ax1.axis('off') fig.canvas.draw() if (savefig): fig.savefig(filename,dpi=150,bbox_inches='tight',pad_inches=0) plt.pause(0.01)
[docs]def compute_exit_times(dt, people, results): """ To compute exit times for all the individuals from the results array Parameters ---------- dt: float time step people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere results: numpy array positions at each time Returns ------- exit_times : all the exit times """ Np = results.shape[0] exit_times = -sp.ones(Np) for id in sp.arange(Np): exit_times[id] = dt*sp.where(results[id,0,:]==-1)[0].min() return exit_times
[docs]def plot_people_according_to_exit_times(ifig, dt, people, domain, results, axis=None, savefig=False, filename='fig.png'): """ To draw occupied cells with colors depending on the exit times Parameters ---------- ifig: int figure number dt: float time step people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere domain: Domain contains everything for managing the domain results: numpy array positions at each time axis: list matplotlib axis savefig: boolean writes the figure as a png file if true filename: string png filename used to write the figure """ fig = plt.figure(ifig) fig.clf() ax1 = fig.add_subplot(111) ax1.imshow(domain.image,interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower') I0 = results[:,0,0] J0 = results[:,1,0] exit_times = compute_exit_times(dt, people, results) tmp = domain.door_distance.copy() tmp[I0,J0] = exit_times people_init =,dtype=int), mask=domain.mask)[I0,J0] = 1 ax1.imshow(tmp*(people_init==1),interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower',alpha = 1.0,cmap='Greys') if (axis): ax1.set_xlim(axis[0],axis[1]) ax1.set_ylim(axis[2],axis[3]) ax1.set_xticks([]) ax1.set_yticks([]) # ax1.set_xlabel('x',color='white') # ax1.set_ylabel('y',color='white') # ax1.set_title('t',color='white') # ax1.spines['left'].set_color('white') # ax1.spines['right'].set_color('white') # ax1.spines['bottom'].set_color('white') # ax1.spines['top'].set_color('white') ax1.axis('off') fig.canvas.draw() if (savefig): fig.savefig(filename,dpi=150,bbox_inches='tight',pad_inches=0) plt.pause(0.01)
[docs]def plot_people_paths(ifig, dt, pixel_size, people, domain, results, axis = None, savefig=False, filename='fig.png'): """ To draw all the individual paths from intial time to final time Parameters ---------- ifig: int figure number dt: float time step pixel_size: float size of one pixel in meters people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere domain: Domain contains everything for managing the domain results: numpy array positions at each time axis: list matplotlib axis savefig: boolean writes the figure as a png file if true filename: string png filename used to write the figure """ Np = results.shape[0] fig = plt.figure(ifig) fig.clf() ax1 = fig.add_subplot(111) ax1.imshow(domain.image,interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower') I0 = results[:,0,0] J0 = results[:,1,0] exit_times = compute_exit_times(dt, people, results) tmp = domain.door_distance.copy() tmp[I0,J0] = exit_times people_init =,dtype=int), mask=domain.mask)[I0,J0] = 1 vmin = (tmp*(people_init==1)).min() vmax = (tmp*(people_init==1)).max() cmap = cm.Greys ax1.imshow(tmp*(people_init==1),interpolation='nearest', extent=[domain.xmin,domain.xmax,domain.ymin,domain.ymax], origin='lower',alpha = 1.0,cmap=cmap, norm=colors.BoundaryNorm(boundaries=sp.linspace(vmin,vmax,cmap.N+1), ncolors=cmap.N)) mask = (results==-1) traj = sp.swapaxes(,dtype=int, mask=mask),1,2) traj = traj[:,:,[1,0]] col = sp.array([]) for id in sp.arange(Np): ns = sp.where(results[id,0,:]!=-1)[0].shape[0]-1 col = sp.hstack((col,sp.ones(ns)*exit_times[id])) paths = LineCollection(0.5*pixel_size+pixel_size*traj, linewidths=2, linestyle="solid", cmap=cmap, norm=colors.BoundaryNorm( \ boundaries=sp.linspace(vmin,vmax,cmap.N+1), \ ncolors=cmap.N) \ ) paths.set_array(exit_times) ax1.add_collection(paths) if (axis): ax1.set_xlim(axis[0],axis[1]) ax1.set_ylim(axis[2],axis[3]) ax1.set_xticks([]) ax1.set_yticks([]) ax1.axis('off') # ax1.set_xlabel('x',color='white') # ax1.set_ylabel('y',color='white') # ax1.set_title('t',color='white') # ax1.spines['left'].set_color('white') # ax1.spines['right'].set_color('white') # ax1.spines['bottom'].set_color('white') # ax1.spines['top'].set_color('white') #ax1.axis('off') fig.canvas.draw() if (savefig): fig.savefig(filename,dpi=150,bbox_inches='tight',pad_inches=0) plt.pause(0.01)
[docs]def sequential_update(people, people_ij, weight, shuffle = None, randomstate = None): """ To move all individuals sequentially according to the following rule: \ the update of individual i is determined in a stochastic way by computing \ transition probabilities on neighboring cells (including the current \ position of i). Parameters ---------- people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere people_ij: numpy array (i,j) for each individual weight: numpy array weights for the probabilities in order to move in such a way like to \ reach the door shuffle: string shuffle kind ('random' or 'random_frozen'): if the sequential order \ changes or not at each time randomstate: numpy randomstate create a new one or reuse the given random state Returns ------- people: numpy masked arrays new positions equal to 1 if the cell (i,j) is occupied, 0 otherwise people_id: numpy masked arrays new people index (i,j) """ Np = people_ij.shape[0] height, width = people.shape if (randomstate == None): randomstate = sp.random.RandomState() #print("---------- sequential_update : shuffle = ",shuffle) ## Default : no shuffle update order = sp.arange(Np) if (shuffle == 'random'): ## Random shuffle update randomstate.shuffle(order) elif (shuffle == 'random_frozen'): ## Frozen shuffle update local_seed = 0.5 random.shuffle(order, lambda: local_seed) for id in order: i = people_ij[id,0]; j = people_ij[id,1] #print("--------- sequential_update : id = ",id," i = ",i," j = ",j) if ((i!=-1) and (j!=-1)): I = [i]; J = [j] if (i>=1): im = i-1 if ( (people[im,j] is not and (people[im,j] !=1 ) ): I.append(im); J.append(j) if (i<=height-2): ip = i+1 if ( (people[ip,j] is not and (people[ip,j] !=1 ) ): I.append(ip); J.append(j) if (j>=1): jm = j-1 if ( (people[i,jm] is not and (people[i,jm] !=1 ) ): I.append(i); J.append(jm) if (j<=width-2): jp = j+1 if ( (people[i,jp] is not and (people[i,jp] !=1 ) ): I.append(i); J.append(jp) w = weight[I,J]/sp.sum(weight[I,J]) #print("--------- sequential_update : I = ",I," J = ", J," w = ",w) pos = randomstate.choice(len(I), 1, p=w) #print("--------- sequential_update : pos = ",pos)[i,j] = 0[I[pos[0]],J[pos[0]]] = 1 people_ij[id,0] = I[pos[0]] people_ij[id,1] = J[pos[0]] #print("===> Move : people_ij = ",people_ij) return people, people_ij
[docs]def parallel_update(people, people_ij, weight, friction = 0, randomstate = None): """ To move all individuals in parallel according to the following rule: \ first, desired moves are precomputed and then the conflicts (two individuals \ at the same position) are resolved Parameters ---------- people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere people_ij: numpy array (i,j) for each individual weight: numpy array weights for the probabilities in order to move in such a way like to \ reach the door friction: float to designate the effect induced by a modified handling of conflicts, \ friction is the probability that a conflict remains unresolved (no one \ moves) randomstate: numpy randomstate create a new one or reuse the given random state Returns ------- people: numpy masked arrays new positions equal to 1 if the cell (i,j) is occupied, 0 elsewhere people_id: numpy masked arrays new people index (i,j) """ height, width = people.shape Np = people_ij.shape[0] if (randomstate == None): randomstate = sp.random.RandomState() new_ij = -sp.ones((Np,3)) ## new_i new_j proba for id in sp.arange(Np): i = people_ij[id,0]; j = people_ij[id,1] if ((i!=-1) and (j!=-1)): I = [i]; J = [j] if (i>=1): im = i-1 if ( (people[im,j] is not and (people[im,j] !=1 ) ): I.append(im); J.append(j) if (i<=height-2): ip = i+1 if ( (people[ip,j] is not and (people[ip,j] !=1 ) ): I.append(ip); J.append(j) if (j>=1): jm = j-1 if ( (people[i,jm] is not and (people[i,jm] !=1 ) ): I.append(i); J.append(jm) if (j<=width-2): jp = j+1 if ( (people[i,jp] is not and (people[i,jp] !=1 ) ): I.append(i); J.append(jp) w = weight[I,J]/sp.sum(weight[I,J]) pos = randomstate.choice(len(I), 1, p=w) new_ij[id,0] = I[pos[0]] new_ij[id,1] = J[pos[0]] new_ij[id,2] = w[pos[0]] ind = sp.where(new_ij[:,0]!=-1)[0] # Find the conflicts import numpy as np unique_ij, index, inverse, counts = np.unique(new_ij[ind,:2], axis=0, return_index=True, return_inverse=True, return_counts=True ) ## numpy 1.13 only !! if (unique_ij.shape[0]<ind.shape[0]): for ic,cc in enumerate(counts): if (cc>1): ## Conflict... ind_conf = sp.where(inverse==ic)[0] ## Positions of people concerned by the Conflicts npc = ind_conf.shape[0] ## Number of concerned persons print("---------- Conflicts between ",npc, " persons") ## No agent will move to the empty cell with the probability "friction" choice = randomstate.choice(2,1,p=[friction, 1-friction])[0] ## 0 = no agent move... if (choice == 0): print("---------- => Friction... No agent move...") for id in sp.arange(ind_conf.shape[0]): new_ij[ind[ind_conf[id]],:2] = people_ij[ind[ind_conf[id]],:2] else: print("---------- => One agent move...") w = new_ij[ind[ind_conf],2]/sp.sum(new_ij[ind[ind_conf],2]) pos = randomstate.choice(ind_conf.shape[0], 1, p=w) ## Winner for id in sp.arange(ind_conf.shape[0]): if (id!=pos): new_ij[ind[ind_conf[id]],:2] = people_ij[ind[ind_conf[id]],:2] ## Update for id in sp.arange(Np): i = people_ij[id,0]; j = people_ij[id,1] I = int(new_ij[id,0]) J = int(new_ij[id,1])[i,j] = 0[I,J] = 1 people_ij[id,0] = I people_ij[id,1] = J #print("===> Move : people_ij = ",people_ij) return people, people_ij
[docs]def exit(domain, people, people_ij): """ To update people and people_ij arrays in removing individuals who \ left the domain Parameters ---------- domain: Domain contains everything for managing the domain people: numpy masked arrays equal to 1 if the cell (i,j) is occupied, 0 elsewhere people_ij: numpy array (i,j) for each individual Returns ------- people: numpy masked arrays new positions equal to 1 if the cell (i,j) is occupied, 0 elsewhere people_id: numpy masked arrays new people index (i,j) """ Sexit = 0.2 ind_in = sp.where(people_ij[:,0] != -1)[0] d = domain.door_distance[people_ij[ind_in,0],people_ij[ind_in,1]] dind = sp.where(d<Sexit)[0] if (dind.shape[0]>0):[ people_ij[ind_in[dind],0], people_ij[ind_in[dind],1] ] = 0 people_ij[ind_in[dind],:] = [-1, -1] #print("===> Exit : people_ij = ",people_ij) return people, people_ij