Source code for workflow.scripts.ts_simulators

## ts_simulators outputs tree sequence. This cannot be used as a simulator for simulate_for_sbi!

import tskit
import msprime
import demes
import torch
import numpy as np
import stdpopsim
from sbi.utils import BoxUniform


[docs] class BaseSimulator: def __init__(self, config: dict, default: dict): for key in config: if key == "class_name": continue assert key in default, f"Option {key} not available for simulator" for key, default in default.items(): setattr(self, key, config.get(key, default))
[docs] class YRI_CEU(BaseSimulator): """ Simulate a model defined in dadi's manual (https://dadi.readthedocs.io/en/latest/examples/YRI_CEU/YRI_CEU/). Ancestral population changes in size; splits into YRI/CEU; CEU population undergoes a bottleneck upon splitting and subsequently grows exponentially. There is continuous symmetric migration between YRI and CEU. """ default_config = { # FIXED PARAMETERS "samples": {"YRI": 10, "CEU": 10}, "sequence_length": 10e6, "recombination_rate": 1.5e-8, "mutation_rate": 1.5e-8, # RANDOM PARAMETERS (UNIFORM) "N_A": [1e2, 1e5], "N_YRI": [1e2, 1e5], "N_CEU_initial": [1e2, 1e5], "N_CEU_final": [1e2, 1e5], "M": [0, 5e-4], "Tp": [0, 6e4], "T": [0, 6e4], } def __init__(self, config: dict): super().__init__(config, self.default_config) self.parameters = ["N_A", "N_YRI", "N_CEU_initial", "N_CEU_final", "M", "Tp", "T"] self.prior = BoxUniform( low=torch.tensor([getattr(self, p)[0] for p in self.parameters]), high=torch.tensor([getattr(self, p)[1] for p in self.parameters]), ) def __call__(self, seed: int = None) -> (tskit.TreeSequence, np.ndarray): torch.manual_seed(seed) theta = self.prior.sample().numpy() N_A, N_YRI, N_CEU_initial, N_CEU_final, M, Tp, T = theta graph = demes.Builder() graph.add_deme( "ancestral", epochs=[dict(start_size=N_A, end_time=Tp + T)] ) graph.add_deme( "AMH", ancestors=["ancestral"], epochs=[dict(start_size=N_YRI, end_time=T)], ) graph.add_deme( "CEU", ancestors=["AMH"], epochs=[dict(start_size=N_CEU_initial, end_size=N_CEU_final)], ) graph.add_deme( "YRI", ancestors=["AMH"], epochs=[dict(start_size=N_YRI)], ) graph.add_migration(demes=["CEU", "YRI"], rate=M) demog = msprime.Demography.from_demes(graph.resolve()) ts = msprime.sim_ancestry( self.samples, demography=demog, sequence_length=self.sequence_length, recombination_rate=self.recombination_rate, random_seed=seed, ) ts = msprime.sim_mutations(ts, rate=self.mutation_rate, random_seed=seed) return ts, theta
[docs] class DroMel_CO_FR(BaseSimulator): """ Simulate a two-population isolation-with-migration model for Drosophila melanogaster, Congolese and French populations, with growth in the French population """ default_config = { # FIXED PARAMETERS "samples": {"CO": 10, "FR": 10}, "sequence_length": 1e6, "recombination_rate": [0.5e-8, 3.0e-8], "mutation_rate": 5.49e-9, "haploid": True, # RANDOM PARAMETERS (UNIFORM IN LOGSPACE) "N_ANC": [3.5, 6.5], # log10 ancestral population size "N_CO": [3.5, 6.5], # log10 Congolese population size "N_FR0": [3.5, 6.5], # log10 French population size after split "N_FR1": [3.5, 6.5], # log10 French population size after growth "T": [3, 6], # log10 split time "m_CO_FR": [-8, -3], # log10 migration Congolese to French "m_FR_CO": [-8, -3], # log10 migration French to Congolese } def __init__(self, config: dict): super().__init__(config, self.default_config) self.parameters = ["N_ANC", "N_CO", "N_FR0", "N_FR1", "T", "m_CO_FR", "m_FR_CO"] self.prior = BoxUniform( low=torch.tensor([getattr(self, p)[0] for p in self.parameters]), high=torch.tensor([getattr(self, p)[1] for p in self.parameters]), ) def __call__(self, seed: int = None) -> (tskit.TreeSequence, np.ndarray): torch.manual_seed(seed) theta = self.prior.sample().numpy() seeds = torch.randint(2 ** 32 - 1, (2, )).numpy() r = self.recombination_rate[0] + \ (self.recombination_rate[1] - self.recombination_rate[0]) * \ torch.rand(1).item() N_ANC, N_CO, N_FR0, N_FR1, T, m_CO_FR, m_FR_CO = (10 ** theta) G_FR = np.log(N_FR1 / N_FR0) / T demogr = msprime.Demography() demogr.add_population(name="CO", initial_size=N_CO) demogr.add_population(name="FR", initial_size=N_FR1, growth_rate=G_FR) demogr.add_population(name="ANC", initial_size=N_ANC) demogr.migration_matrix = np.array([[0, m_CO_FR, 0], [m_FR_CO, 0, 0], [0, 0, 0]]) demogr.add_population_split(time=T, derived=["CO", "FR"], ancestral="ANC") samples = [ msprime.SampleSet(n, population=p, ploidy=1 if self.haploid else 2) for p, n in self.samples.items() ] ts = msprime.sim_ancestry( samples, demography=demogr, sequence_length=self.sequence_length, recombination_rate=r, random_seed=seeds[0], ) ts = msprime.sim_mutations( ts, rate=self.mutation_rate, random_seed=seeds[1], ) return ts, theta
[docs] class AraTha_2epoch(BaseSimulator): """ Simulate the African2Epoch_1H18 model from stdpopsim for Arabidopsis thaliana. The model consists of a single population that undergoes a size change. """ species = stdpopsim.get_species("AraTha") model = species.get_demographic_model("African2Epoch_1H18") default_config = { # FIXED PARAMETERS "samples": {"SouthMiddleAtlas": 10}, "sequence_length": 10e6, # RANDOM PARAMETERS (UNIFORM) "nu": [0.01, 1], # Ratio of current to ancestral population size "T": [0.01, 1.5], # Time of size change (scaled) } def __init__(self, config: dict): super().__init__(config, self.default_config) self.parameters = ["nu", "T"] self.prior = BoxUniform( low=torch.tensor([getattr(self, p)[0] for p in self.parameters]), high=torch.tensor([getattr(self, p)[1] for p in self.parameters]), ) def __call__(self, seed: int = None) -> (tskit.TreeSequence, np.ndarray): torch.manual_seed(seed) theta = self.prior.sample().numpy() nu, T = theta species = self.species contig = species.get_contig( length=self.sequence_length, ) model = self.model # Scale the population size and time parameters N_A = model.model.events[0].initial_size # ancestral population size model.populations[0].initial_size = nu * N_A # current population size model.model.events[0].time = T * 2 * N_A # time of size change engine = stdpopsim.get_engine("msprime") ts = engine.simulate( model, contig, samples=self.samples, random_seed=seed ) return ts, theta
[docs] class VariablePopulationSize(BaseSimulator): """ Simulate a model with varying population size across multiple time windows. The model consists of a single population that undergoes multiple size changes. """ default_config = { # FIXED PARAMETERS "samples": {"pop": 10}, "sequence_length": 10e6, "mutation_rate": 1.5e-8, "num_time_windows": 3, # RANDOM PARAMETERS (UNIFORM) "pop_sizes": [1e2, 1e5], # Range for population sizes (log10 space) "recomb_rate": [1e-9, 2e-8], # Range for recombination rate # TIME PARAMETERS "max_time": 100000, # Maximum time for population events "time_rate": 0.1, # Rate at which time changes across windows } def __init__(self, config: dict): super().__init__(config, self.default_config) # Set up parameters list self.parameters = [f"N_{i}" for i in range(self.num_time_windows)] + ["recomb_rate"] # Create parameter ranges in the same format as AraTha_2epoch # Population sizes (in log10 space) pop_size_ranges = [[np.log10(self.pop_sizes[0]), np.log10(self.pop_sizes[1])] for _ in range(self.num_time_windows)] # Add recombination rate range param_ranges = pop_size_ranges + [self.recomb_rate] # Set up prior using BoxUniform self.prior = BoxUniform( low=torch.tensor([r[0] for r in param_ranges]), high=torch.tensor([r[1] for r in param_ranges]) ) # Calculate fixed time points for population size changes self.change_times = self._calculate_change_times() def _calculate_change_times(self) -> np.ndarray: """Calculate the times at which population size changes occur using an exponential spacing.""" #times = [(np.exp(np.log(1 + self.time_rate * self.max_time) * i / # (self.num_time_windows - 1)) - 1) / self.time_rate # for i in range(self.num_time_windows)] win = np.logspace(2, np.log10(self.max_time), self.num_time_windows) win[0] = 0 times = win return np.around(times).astype(int) def __call__(self, seed: int = None) -> (tskit.TreeSequence, np.ndarray): if seed is not None: torch.manual_seed(seed) min_snps = 400 max_attempts = 100 attempt = 0 while attempt < max_attempts: # Sample parameters directly from prior (like AraTha_2epoch) theta = self.prior.sample().numpy() # Convert population sizes from log10 space pop_sizes = 10 ** theta[:-1] # All but last element are population sizes recomb_rate = theta[-1] # Last element is recombination rate # Create demography demography = msprime.Demography() demography.add_population(name="pop0", initial_size=float(pop_sizes[0])) # Add population size changes at calculated time intervals for i in range(1, len(pop_sizes)): demography.add_population_parameters_change( time=self.change_times[i], initial_size=float(pop_sizes[i]), growth_rate=0, population="pop0" ) # Simulate ancestry ts = msprime.sim_ancestry( samples={"pop0": self.samples["pop"]}, demography=demography, sequence_length=self.sequence_length, recombination_rate=recomb_rate, random_seed=seed ) # Add mutations ts = msprime.sim_mutations(ts, rate=self.mutation_rate, random_seed=seed) # Check if we have enough SNPs after MAF filtering geno = ts.genotype_matrix().T num_sample = geno.shape[0] if (geno==2).any(): num_sample *= 2 row_sum = np.sum(geno, axis=0) keep = np.logical_and.reduce([ row_sum != 0, row_sum != num_sample, row_sum > num_sample * 0.05, num_sample - row_sum > num_sample * 0.05 ]) if np.sum(keep) >= min_snps: return ts, theta attempt += 1 if seed is not None: seed += 1 raise RuntimeError(f"Failed to generate tree sequence with at least {min_snps} SNPs after {max_attempts} attempts")
[docs] class recombination_rate(BaseSimulator): """ Simulate a one population model where recombination rate varies among replicates. The prior is a beta distribution shifted/scaled to a given interval (by default, a noninformative beta). """ default_config = { # FIXED PARAMETERS "samples": {0: 10}, "sequence_length": 1e6, "mutation_rate": 1.5e-8, "pop_size": 1e4, "mean_and_dispersion": [0.5, 0.5], # beta mean and dispersion # RANDOM PARAMETERS (UNIFORM) "recombination_rate": [0, 1e-8], # bounds on recombination rate } def __init__(self, config: dict): super().__init__(config, self.default_config) self.parameters = ["recombination_rate"] self.prior = BoxUniform( low=torch.tensor([getattr(self, p)[0] for p in self.parameters]), high=torch.tensor([getattr(self, p)[1] for p in self.parameters]), ) def __call__(self, seed: int = None) -> (tskit.TreeSequence, np.ndarray): torch.manual_seed(seed) mean, dispersion = self.mean_and_dispersion alpha, beta = mean / dispersion, (1 - mean) / dispersion prior = torch.distributions.Beta(torch.FloatTensor([alpha]), torch.FloatTensor([beta])) low, high = self.recombination_rate theta = low + (high - low) * prior.sample().numpy() recombination_rate = theta.item() ts = msprime.sim_ancestry( self.samples, population_size=self.pop_size, sequence_length=self.sequence_length, recombination_rate=recombination_rate, random_seed=seed, discrete_genome=False, # don't want overlapping mutations ) ts = msprime.sim_mutations(ts, rate=self.mutation_rate, random_seed=seed) return ts, theta
[docs] class DependentVariablePopulationSize(BaseSimulator): """ Simulate a population with variable population size across multiple time windows, with each population size dependent on the previous one. The model consists of a single population that undergoes multiple size changes. """ default_config = { # FIXED PARAMETERS "samples": {"pop0": 25}, "sequence_length": 2e6, "mutation_rate": 1e-8, "num_time_windows": 21, "maf": 0.05, # RANDOM PARAMETERS (UNIFORM) "pop_sizes": [1e2, 1e5], # Range for population sizes (log10 space) "pop_changes": [-1, 1], # Range for population size changes (* 10 ** beta) "recomb_rate": [1e-9, 1e-8], # Range for recombination rate # TIME PARAMETERS "max_time": 130000, # Maximum time for population events "time_rate": 0.1, # Rate at which time changes across windows } def __init__(self, config: dict): super().__init__(config, self.default_config) # Set up parameters list self.parameters = [f"N_{i}" for i in range(self.num_time_windows)] + ["recomb_rate"] # Create parameter ranges in the same format as AraTha_2epoch # Population sizes (in log10 space) pop_size_range = [[np.log10(self.pop_sizes[0]), np.log10(self.pop_sizes[1])] for _ in range(self.num_time_windows)] pop_change_ranges = [[self.pop_changes[0], self.pop_changes[1]] for _ in range(1,self.num_time_windows)] # Add recombination rate range param_ranges = pop_size_range + [self.recomb_rate] # Set up prior using BoxUniform self.prior = BoxUniform( low=torch.tensor([r[0] for r in param_ranges]), high=torch.tensor([r[1] for r in param_ranges]) ) self.beta_prior = BoxUniform( low=torch.tensor([b[0] for b in pop_change_ranges]), high=torch.tensor([b[1] for b in pop_change_ranges]) ) # Calculate fixed time points for population size changes self.change_times = self._calculate_change_times() def _calculate_change_times(self) -> np.ndarray: """Calculate the times at which population size changes occur using an exponential spacing.""" times = [(np.exp(np.log(1 + self.time_rate * self.max_time) * i / (self.num_time_windows - 1)) - 1) / self.time_rate for i in range(self.num_time_windows)] return np.around(times).astype(int) def _generate_dependent_pop_sizes(self) -> np.ndarray: """ Generate a sequence of population sizes where the first population size N_1 is sampled from a uniform distribution corresponding to the most recent time window. The following population sizes are generated following N_i = N_{i-1} * 10 ^ β for i in [2,...,num_time_windows], unless N_i is outside of pop_ranges. If so N_i is set to the max/min population size """ prior_sample = self.prior.sample().numpy() # only the first sample from prior will be used beta_sample = self.beta_prior.sample().numpy() modified_prior = prior_sample.copy() # The first value is uniformly sampled within the log10 bounds for i in range(self.num_time_windows-1): # For subsequent time windows, calculate the new value based on the previous one and beta new_value = modified_prior[i] + beta_sample[i] # If the new value is outside the bounds, set it to the max/min if new_value > np.log10(self.pop_sizes[1]): new_value = np.log10(self.pop_sizes[1]) if new_value < np.log10(self.pop_sizes[0]): new_value = np.log10(self.pop_sizes[0]) modified_prior[i+1] = new_value # Return the sampled prior and recombination rate return modified_prior def __call__(self, seed: int = None) -> (tskit.TreeSequence, np.ndarray): if seed is not None: torch.manual_seed(seed) min_snps = 400 max_attempts = 100 attempt = 0 while attempt < max_attempts: # Sample parameters directly from prior (like AraTha_2epoch) theta = self._generate_dependent_pop_sizes() pop_sizes = 10**theta[:-1] # All but last element are for the population sizes recomb_rate = theta[-1] # Last element is recombination rate # Create demography demography = msprime.Demography() demography.add_population(name="pop0", initial_size=float(pop_sizes[0])) # Add population size changes at calculated time intervals for i in range(1, len(pop_sizes)): demography.add_population_parameters_change( time=self.change_times[i], initial_size=float(pop_sizes[i]), growth_rate=0, population="pop0" ) # Simulate ancestry ts = msprime.sim_ancestry( samples=self.samples, demography=demography, sequence_length=self.sequence_length, recombination_rate=recomb_rate, random_seed=seed ) # Add mutations ts = msprime.sim_mutations(ts, rate=self.mutation_rate, random_seed=seed) # Check if we have enough SNPs after MAF filtering geno = ts.genotype_matrix().T num_sample = ts.num_samples row_sum = np.sum(geno, axis=0) keep = np.logical_and.reduce([ row_sum > num_sample * self.maf, num_sample - row_sum > num_sample * self.maf ]) if np.sum(keep) >= min_snps: return ts, theta attempt += 1 if seed is not None: seed += 1 raise RuntimeError(f"Failed to generate tree sequence with at least {min_snps} SNPs after {max_attempts} attempts")