You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
3.6 KiB

from __future__ import absolute_import
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.util import datetime_to_utc_timestamp
class MemoryJobStore(BaseJobStore):
Stores jobs in an array in RAM. Provides no persistence support.
Plugin alias: ``memory``
def __init__(self):
super(MemoryJobStore, self).__init__()
# list of (job, timestamp), sorted by next_run_time and job id (ascending)
self._jobs = []
self._jobs_index = {} # id -> (job, timestamp) lookup table
def lookup_job(self, job_id):
return self._jobs_index.get(job_id, (None, None))[0]
def get_due_jobs(self, now):
now_timestamp = datetime_to_utc_timestamp(now)
pending = []
for job, timestamp in self._jobs:
if timestamp is None or timestamp > now_timestamp:
return pending
def get_next_run_time(self):
return self._jobs[0][0].next_run_time if self._jobs else None
def get_all_jobs(self):
return [j[0] for j in self._jobs]
def add_job(self, job):
if in self._jobs_index:
raise ConflictingIdError(
timestamp = datetime_to_utc_timestamp(job.next_run_time)
index = self._get_job_index(timestamp,
self._jobs.insert(index, (job, timestamp))
self._jobs_index[] = (job, timestamp)
def update_job(self, job):
old_job, old_timestamp = self._jobs_index.get(, (None, None))
if old_job is None:
raise JobLookupError(
# If the next run time has not changed, simply replace the job in its present index.
# Otherwise, reinsert the job to the list to preserve the ordering.
old_index = self._get_job_index(old_timestamp,
new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
if old_timestamp == new_timestamp:
self._jobs[old_index] = (job, new_timestamp)
del self._jobs[old_index]
new_index = self._get_job_index(new_timestamp,
self._jobs.insert(new_index, (job, new_timestamp))
self._jobs_index[] = (job, new_timestamp)
def remove_job(self, job_id):
job, timestamp = self._jobs_index.get(job_id, (None, None))
if job is None:
raise JobLookupError(job_id)
index = self._get_job_index(timestamp, job_id)
del self._jobs[index]
del self._jobs_index[]
def remove_all_jobs(self):
self._jobs = []
self._jobs_index = {}
def shutdown(self):
def _get_job_index(self, timestamp, job_id):
Returns the index of the given job, or if it's not found, the index where the job should be
inserted based on the given timestamp.
:type timestamp: int
:type job_id: str
lo, hi = 0, len(self._jobs)
timestamp = float('inf') if timestamp is None else timestamp
while lo < hi:
mid = (lo + hi) // 2
mid_job, mid_timestamp = self._jobs[mid]
mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp
if mid_timestamp > timestamp:
hi = mid
elif mid_timestamp < timestamp:
lo = mid + 1
elif > job_id:
hi = mid
elif < job_id:
lo = mid + 1
return mid
return lo