import datetime
import os.path
from sqlalchemy.orm import aliased, contains_eager, selectinload
import pandas as pd
from collections import OrderedDict
from .database import Database
from .schema import schema_version, default_sample_rate
from ..synphys_cache import get_db_path, list_db_versions

[docs]class SynphysDatabase(Database): """Augments the Database class with convenience methods for querying the synphys database. """ default_sample_rate = default_sample_rate schema_version = schema_version mouse_projects = ["mouse V1 coarse matrix", "mouse V1 pre-production"] human_projects = ["human coarse matrix"]
[docs] @classmethod def load_sqlite(cls, sqlite_file, readonly=True): """Return a SynphysDatabase instance connected to an existing sqlite file. """ ro_host = 'sqlite:///' rw_host = None if readonly else ro_host return cls(ro_host, rw_host, db_name=sqlite_file)
[docs] @classmethod def load_current(cls, db_size): """Load the most recent version of the database that is supported by this version of aisynphys. The database file will be downloaded and cached, if an existing cache file is not found. Parameters ---------- db_size : str Must be one of 'small', 'medium', or 'full'. """ assert db_size in ('small', 'medium', 'full'), "db_size argument must be one of 'small', 'medium', or 'full'" current = cls.list_current_versions() if db_size not in current: raise Exception(f"This version of aisynphys requires database schema version {cls.schema_version}, " "but no released database files were found with this schema version.") if current[db_size] is None: raise Exception(f"There are no published {db_size} database files for the current schema version ({cls.schema_version}).") return cls.load_version(current[db_size]['db_file'])
[docs] @classmethod def list_current_versions(cls): """Return a dict of the most recent DB versions for each size. If no published DB file is available for a particular size, then the value will be set to None in the returned dictionary. """ versions_available = list_db_versions() current_versions = {} for size in ('small', 'medium', 'full'): versions_with_size = [desc for desc in versions_available if desc['db_size'] == size and desc['schema_version'] == cls.schema_version] if len(versions_with_size) == 0: current_versions[size] = None else: current_versions[size] = versions_with_size[-1] return current_versions
[docs] @classmethod def list_versions(cls, only_supported=False): """Return a list of all available database versions. Each item in the list is a dictionary with keys db_file, release_version, db_size, and schema_version. """ vers = list_db_versions() if only_supported: vers = [v for v in vers if v['schema_version'] == cls.schema_version] return vers
[docs] @classmethod def load_version(cls, db_version): """Load a named database version. Available database names can be listed using :func:`list_versions`. The database file will be downloaded and cached, if an existing cache file is not found. Example:: >>> from aisynphys.database import SynphysDatabase >>> SynphysDatabase.list_versions() [ {'db_file': 'synphys_r1.0_small.sqlite'}, {'db_file': 'synphys_r1.0_medium.sqlite'}, {'db_file': 'synphys_r1.0_full.sqlite'}, ... ] >>> db = SynphysDatabase.load_version('synphys_r1.0_small.sqlite') Downloading => /home/luke/docs/aisynphys/doc/cache/database/synphys_r1.0_2019-08-29_small.sqlite [####################] 100.00% (73.13 MB / 73.1 MB) 4.040 MB/s 0:00:00 remaining done. """ db_file = get_db_path(db_version) db = SynphysDatabase.load_sqlite(db_file, readonly=True) return db
def __init__(self, ro_host, rw_host, db_name, check_schema=True): from .schema import ORMBase Database.__init__(self, ro_host, rw_host, db_name, ORMBase) self._project_names = None if check_schema: try: self._check_version() except Exception: # need to clean up carefully since this can introduce a persistent connection self.default_session.rollback() self.dispose_engines() raise @property def version_name(self): """The version name of this database, as accepted by SynphysDatabase.load_version() """ return os.path.split(self.db_name)[-1]
[docs] def initialize_database(self): """Optionally called after create_tables """ # initialize or verify db version mrec = self.metadata_record() if mrec is None: mrec = self.Metadata(meta={ 'db_version': schema_version, 'creation_date':'%Y-%m-%d'), 'origin': "Allen Institute for Brain Science / Synaptic Physiology", }) s = self.session(readonly=False) s.add(mrec) s.commit() else: self._check_version()
def _check_version(self): mrec = self.metadata_record() if mrec is not None: ver = mrec.meta['db_version'] assert ver == schema_version, "Database {self} has unsupported schema version {ver} (expected {schema_version})".format(locals()) def metadata_record(self, session=None): if session is None: session = self.session() close_session = True else: close_session = False try: recs = session.query(self.Metadata).all() if len(recs) == 0: return None elif len(recs) > 1: raise Exception("Multiple metadata records found.") return recs[0] finally: if close_session: session.close() def slice_from_timestamp(self, ts, session=None): session = session or self.default_session slices = session.query(self.Slice).filter(self.Slice.acq_timestamp==ts).all() if len(slices) == 0: raise KeyError("No slice found for timestamp %0.3f" % ts) elif len(slices) > 1: raise KeyError("Multiple slices found for timestamp %0.3f" % ts) return slices[0] def experiment_from_timestamp(self, ts, session=None): session = session or self.default_session expts = session.query(self.Experiment).filter(self.Experiment.acq_timestamp==ts).all() if len(expts) == 0: # For backward compatibility, check for timestamp truncated to 2 decimal places for expt in session.query(self.Experiment).all(): if abs((expt.acq_timestamp - ts)) < 0.01: return expt raise KeyError("No experiment found for timestamp %0.3f" % ts) elif len(expts) > 1: raise RuntimeError("Multiple experiments found for timestamp %0.3f" % ts) return expts[0] def experiment_from_ext_id(self, ext_id, session=None): session = session or self.default_session expts = session.query(self.Experiment).filter(self.Experiment.ext_id==ext_id).all() if len(expts) == 0: raise KeyError('No experiment found for ext_id %s' %ext_id) elif len(expts) > 1: raise RuntimeError("Multiple experiments found for ext_id %s" %ext_id) return expts[0] def slice_from_ext_id(self, ext_id, session=None): session = session or self.default_session slices = session.query(self.Slice).filter(self.Slice.ext_id==ext_id).all() if len(slices) == 0: raise KeyError("No slice found for ext_id %s" % ext_id) elif len(slices) > 1: raise KeyError("Multiple slices found for ext_id %s" % ext_id) return slices[0] def list_experiments(self, session=None): session = session or self.default_session return session.query(self.Experiment).all() def list_project_names(self, session=None, cache=True): session = session or self.default_session if cache is False or self._project_names is None: self._project_names = [rec[0] for rec in session.query(self.Experiment.project_name).distinct().all()] return self._project_names
[docs] def pair_query(self, pre_class=None, post_class=None, synapse=None, synapse_type=None, synapse_probed=None, electrical=None, experiment_type=None, project_name=None, acsf=None, age=None, species=None, distance=None, internal=None, preload=(), session=None, filter_exprs=None): """Generate a query for selecting pairs from the database. Parameters ---------- pre_class : :class:`aisynphys.cell_class.CellClass` | None Filter for pairs where the presynaptic cell belongs to this class post_class : :class:`aisynphys.cell_class.CellClass` | None Filter for pairs where the postsynaptic cell belongs to this class synapse : bool | None Include only pairs that are (or are not) connected by a chemical synapse synapse_type : str | None Include only synapses of a particular type ('ex' or 'in') synapse_probed : bool | None If True, include only pairs that were probed for a synaptic connection (regardless of whether a connectin was found) electrical : bool | None Include only pairs that are (or are not) connected by an electrical synapse (gap junction) experiment_type : str | None Include only data from specific types of experiments project_name : str | list | None Value(s) to match from experiment.project_name (e.g. "mouse V1 coarse matrix" or "human coarse matrix") acsf : str | list | None Filter for ACSF recipe name(s) age : tuple | None (min, max) age ranges to filter for. Either limit may be None to disable that check. species : str | None Species ('mouse' or 'human') to filter for distance : tuple | None (min, max) intersomatic distance in meters internal : str | list | None Electrode internal solution recipe name(s) preload : list List of strings specifying resources to preload along with the queried pairs. This can speed up performance in cases where these would otherwise be individually queried later on. Options are: - "experiment" (includes experiment and slice) - "cell" (includes cell, morphology, cortical_location, and patch_seq) - "synapse" (includes synapse, resting_state, dynamics, and synapse_model) - "synapse_prediction" (includes only synapse_prediction) filter_exprs : list | None List of sqlalchemy expressions, each of which will restrict the query via a call to query.filter(expr) """ if experiment_type == 'standard multipatch': assert project_name is None, "cannot specify both experiment_type and project_name" project_name = ['mouse V1 coarse matrix', 'mouse V1 pre-production', 'human coarse matrix'] session = session or self.default_session pre_cell = aliased(self.Cell, name='pre_cell') post_cell = aliased(self.Cell, name='post_cell') pre_morphology = aliased(self.Morphology, name='pre_morphology') post_morphology = aliased(self.Morphology, name='post_morphology') pre_patch_seq = aliased(self.PatchSeq, name='pre_patch_seq') post_patch_seq = aliased(self.PatchSeq, name='post_patch_seq') pre_intrinsic = aliased(self.Intrinsic, name='pre_intrinsic') post_intrinsic = aliased(self.Intrinsic, name='post_intrinsic') pre_location = aliased(self.CorticalCellLocation, name='pre_cortical_cell_location') post_location = aliased(self.CorticalCellLocation, name='post_cortical_cell_location') query = ( session.query(self.Pair) .join(pre_cell, .join(post_cell, .outerjoin(pre_morphology, .outerjoin(post_morphology, .outerjoin(pre_patch_seq, .outerjoin(post_patch_seq, .outerjoin(pre_intrinsic, .outerjoin(post_intrinsic, .outerjoin(pre_location, .outerjoin(post_location, .join(self.Experiment, .outerjoin(self.Slice, ## don't want to drop all pairs if we don't have slice or connection strength entries .outerjoin(self.Synapse, .outerjoin(self.SynapsePrediction, .outerjoin(self.Dynamics, .outerjoin(self.RestingStateFit, .outerjoin(self.SynapseModel, .outerjoin(self.Conductance, # .outerjoin(self.PolySynapse) # .outerjoin(self.GapJunction) ) if pre_class is not None: query = pre_class.filter_query(query, pre_cell, db=self) if post_class is not None: query = post_class.filter_query(query, post_cell, db=self) if synapse is not None: query = query.filter(self.Pair.has_synapse==synapse) if synapse_type is not None: assert synapse_type in ['ex', 'in', 'mixed'], "synapse_type must be 'ex', 'in', or 'mixed'" query = query.filter(self.Synapse.synapse_type==synapse_type) if synapse_probed is True: from ..connectivity import probed_pair_test_spike_limit query = query.filter( ((pre_cell.cell_class == 'ex') & (self.Pair.n_ex_test_spikes > probed_pair_test_spike_limit)) | ((pre_cell.cell_class == 'in') & (self.Pair.n_in_test_spikes > probed_pair_test_spike_limit)) | ((self.Pair.n_ex_test_spikes > probed_pair_test_spike_limit) & (self.Pair.n_in_test_spikes > probed_pair_test_spike_limit)) ) if electrical is not None: query = query.filter(self.Pair.has_electrical==electrical) if project_name is not None: names = [project_name] if isinstance(project_name, str) else project_name for name in names: assert name in self.list_project_names(), "project_name %r not found in database (see SynphysDatabase.list_project_names)" % name if isinstance(project_name, str): query = query.filter(self.Experiment.project_name==project_name) else: query = query.filter(self.Experiment.project_name.in_(project_name)) if acsf is not None: if isinstance(acsf, str): query = query.filter(self.Experiment.acsf==acsf) else: query = query.filter(self.Experiment.acsf.in_(acsf)) if age is not None: if age[0] is not None: query = query.filter(self.Slice.age>=age[0]) if age[1] is not None: query = query.filter(self.Slice.age<=age[1]) if distance is not None: if distance[0] is not None: query = query.filter(self.Pair.distance>=distance[0]) if distance[1] is not None: query = query.filter(self.Pair.distance<=distance[1]) if species is not None: query = query.filter(self.Slice.species==species) if internal is not None: if isinstance(internal, str): query = query.filter(self.Experiment.internal==internal) else: query = query.filter(self.Experiment.internal.in_(internal)) if filter_exprs is not None: for expr in filter_exprs: query = query.filter(expr) if 'experiment' in preload: query = ( query .add_entity(self.Experiment) .add_entity(self.Slice) ) query = query.options( contains_eager(self.Pair.experiment), contains_eager(self.Experiment.slice), ) if 'cell' in preload: query = ( query .add_entity(pre_cell) .add_entity(post_cell) .add_entity(pre_morphology) .add_entity(post_morphology) .add_entity(pre_patch_seq) .add_entity(post_patch_seq) .add_entity(pre_intrinsic) .add_entity(post_intrinsic) .add_entity(pre_location) .add_entity(post_location) ) query = query.options( contains_eager(self.Pair.pre_cell, alias=pre_cell), contains_eager(self.Pair.post_cell, alias=post_cell), contains_eager(pre_cell.morphology, alias=pre_morphology), contains_eager(post_cell.morphology, alias=post_morphology), contains_eager(pre_cell.patch_seq, alias=pre_patch_seq), contains_eager(post_cell.patch_seq, alias=post_patch_seq), contains_eager(pre_cell.intrinsic, alias=pre_intrinsic), contains_eager(post_cell.intrinsic, alias=post_intrinsic), contains_eager(pre_cell.cortical_location, alias=pre_location), contains_eager(post_cell.cortical_location, alias=post_location), ) if 'synapse' in preload: query = ( query .add_entity(self.Synapse) .add_entity(self.RestingStateFit) .add_entity(self.Dynamics) .add_entity(self.SynapseModel) ) query = query.options( contains_eager(self.Pair.synapse), contains_eager(self.Synapse.resting_state_fit), contains_eager(self.Pair.dynamics), contains_eager(self.Pair.synapse_model), ) if 'synapse_prediction' in preload: query = ( query .add_entity(self.SynapsePrediction) ) query = query.options( contains_eager(self.Pair.synapse_prediction), ) # package the aliased cells query.pre_cell = pre_cell query.post_cell = post_cell query.pre_morphology = pre_morphology query.post_morphology = post_morphology query.pre_location = pre_location query.post_location = post_location query.pre_intrinsic = pre_intrinsic query.post_intrinsic = post_intrinsic query.pre_patch_seq = pre_patch_seq query.post_patch_seq = post_patch_seq return query
[docs] def matrix_pair_query(self, pre_classes, post_classes, columns=None, pair_query_args=None): """Returns the concatenated result of running pair_query over every combination of presynaptic and postsynaptic cell class. """ if pair_query_args is None: pair_query_args = {} pairs = None for pre_name, pre_class in pre_classes.items(): for post_name, post_class in post_classes.items(): pair_query = self.pair_query( pre_class=pre_class, post_class=post_class, **pair_query_args ) if columns is not None: pair_query = pair_query.add_columns(*columns) df = pair_query.dataframe(rename_columns=False) df['pre_class'] = pre_name df['post_class'] = post_name if pairs is None: pairs = df else: pairs = pd.concat([pairs, df], axis=0, join='outer') return pairs
def __getstate__(self): """Allows DB to be pickled and passed to subprocesses. """ return { 'ro_host': self.ro_host, 'rw_host': self.rw_host, 'db_name': self.db_name, } def __setstate__(self, state): self.__init__(ro_host=state['ro_host'], rw_host=state['rw_host'], db_name=state['db_name'])