Examples¶
Example Projects¶
- Tocketry with FastAPI (and React)¶
Tocketry template that has FastAPI (REST API) integrated and also has web UI built with React.
Example Snippets¶
Minimal example:
from tocketry import Tocketry
app = Tocketry()
@app.task("daily")
def do_things(): ...
if __name__ == "__main__":
app.run()
Basic example:
from tocketry import Tocketry
app = Tocketry()
@app.task("daily")
def do_things(): ...
@app.task("after task 'do_things'")
def do_after_things(): ...
if __name__ == "__main__":
app.run()
Intermediate example:
from tocketry import Tocketry
from tocketry.args import Return, Session, Arg, FuncArg
from tocketry.conds import daily, time_of_week, after_success
app = Tocketry()
@app.cond()
def is_foo():
"This is a custom condition"
...
return True
@app.task(daily & is_foo)
def do_daily():
"This task runs once a day when foo is true"
...
return ...
@app.task(
(daily.at("10:00") | daily.at("19:00")) & time_of_week.between("Mon", "Fri"),
execution="process",
)
def do_complex():
"This task runs on complex interval and on separate process"
...
return ...
@app.task(after_success(do_daily))
def do_after_another(arg=Return(do_daily)):
"""This task runs after 'do_daily' and it has its the
return argument as an input"""
...
@app.task(daily)
def do_with_params(arg1=FuncArg(lambda: ...), arg2=Arg("myparam")):
"""This task runs with variety of arguments"""
...
@app.task(daily, execution="thread")
def do_on_session(session=Session()):
"This task modifies the scheduling session"
# Setting a task to run
for task in session.tasks:
if task.name == "do_after_another":
task.run(arg="...")
# Call for shut down
session.shut_down()
if __name__ == "__main__":
app.params(myparam="...")
app.run()
Advanced example:
from tocketry import Tocketry
from tocketry.args import Return, Arg
app = Tocketry()
# Custom Condition
# ----------------
@app.cond("is foo")
def is_foo():
# This is a custom condition
...
return True
# Parameters
# ----------
app.params(my_arg="Hello")
@app.param("item")
def get_item():
# This is a custom condition
...
return "world"
# Tasks
# -----
@app.task("daily", execution="process")
def do_on_process():
"This task runs once a day and runs on separate process"
...
return ...
@app.task("after task 'do_things'")
def do_pipeline(arg1=Return("do_on_process"), arg2=Arg("item"), arg3=Arg("my_arg")):
"""This task runs when 'do_on_process' has succeeded.
Argument 'arg1' gets the return value of 'do_on_process'
Argument 'arg2' gets the return value of function 'get_item'
Argument 'arg3' is simply the value of a session parameter 'my_arg'"""
...
@app.task("daily & is foo", execution="thread")
def do_custom():
"""This task runs once a day and when is_foo returns True
This task runs on separate thread"""
...
@app.task("(true & true) | (false & True & ~True)")
def do_complex():
"""Notice the logical expression in the task start condition"""
if __name__ == "__main__":
app.run()