Svelte Stores
are one of the secret weapons of the Svelte
framework (the recently voted most loved web
framework).
Stores allow easy reactive programming by presenting an Observer pattern that is as simple as necessary, but not simpler.
pip install sveltish
Sometimes, you’ll have values that need to be accessed by multiple unrelated objects.
For that, you can use stores
. It is a very simple implementation
(around 100 lines of code) of the Observer/Observable pattern.
A store is simply an object with a subscribe
method that allows
interested parties to be notified when its value changes.
from sveltish.stores import writable
count = writable(0)
history = [] # logging for testing
# subscribe returns an unsubscriber
def record(x):
history.append(x)
print(history)
stop = count.subscribe(record)
test_eq(history, [0])
[0]
We just created a count
store. Its value can be accessed via a
callback
we pass in the count.subscribe
method:
A Writable can be set from the outside. When it happens, all its subscribers will react.
def increment(): count.update(lambda x: x + 1)
def decrement(): count.update(lambda x: x - 1)
def reset(): count.set(0)
count.set(3)
increment()
decrement()
decrement()
reset()
count.set(42)
test_eq(history, [0, 3, 4, 3, 2, 0, 42])
[0, 3]
[0, 3, 4]
[0, 3, 4, 3]
[0, 3, 4, 3, 2]
[0, 3, 4, 3, 2, 0]
[0, 3, 4, 3, 2, 0, 42]
The unsubscriber
, in this example the stop
function, stops the
notifications to the subscriber
.
stop()
reset()
count.set(22)
test_eq(history, [0, 3, 4, 3, 2, 0, 42])
count
w<0> $int: 22
Notice that you can still change the store
but there was no print
message this time. There was no observer listening.
When we subscribe new callbacks, they will be promptly informed of the
current state of the store
.
stop = count.subscribe(lambda x: print(f"Count is now {x}"))
stop2 = count.subscribe(lambda x: print(f"double of count is {2*x}"))
Count is now 22
double of count is 44
reset()
Count is now 0
double of count is 0
stop()
stop2()
You can create an empty Writable Store
.
store = writable()
history = []
unsubscribe = store.subscribe(lambda x: history.append(x))
unsubscribe()
test_eq(history, [None])
If you try to unsubscribe twice, it won’t break. It just does nothing the second time… and in the third time… and…
unsubscribe(), unsubscribe(), unsubscribe()
(None, None, None)
Stores assume mutable objects.
Note
In Python everythong is an object. Here we are calling an object something that is not a primitive (eg. int, bool, etc)
class Bunch:
__init__ = lambda self, **kw: setattr(self, '__dict__', kw)
obj = Bunch()
called = 0
store = writable(obj)
def callback(x):
global called
called += 1
stop = store.subscribe(callback)
test_eq(called, 1)
obj.a = 1 #type: ignore
store.set(obj)
test_eq(called, 2)
However… It is clear that not all stores should be writable by whoever
has a reference to them. Many times you want a single publisher
of
change in store that is only consumed (subscribed
) by many other
objects. For those cases, we have readable stores.
Note
The
Publisher Subscriber (PubSub)
pattern is a variant of theObservable/Observer
pattern.
from sveltish.stores import readable
A Readable store without a start
function is a constant value and has
no meaning for us. Therefore, start
is a required argument.
try:
c = readable(0) # shoud fail
except Exception as error:
print(error)
test_fail(lambda: readable(0))
readable() missing 1 required positional argument: 'start'
class Publisher:
def __init__(self): self.set = lambda x: None
def set_set(self, set):
self.set = set
return lambda: None
def use_set(self, value): self.set(value)
p = Publisher()
reader = readable(0, p.set_set)
reader
r<0> $int: 0
Ths store only starts updating after the first subscriber. Here, the publisher does not change the store.
p.use_set(1), reader
(None, r<0> $int: 0)
stop = reader.subscribe(lambda x: print(f"reader is now {x}"))
reader is now 0
p.use_set(2)
reader is now 2
stop()
Another example of Readable Store usage:
from threading import Event, Thread
import time
def start(set): # the start function is the publisher
stopped = Event()
def loop(): # needs to be in a separate thread
while not stopped.wait(1): # in seconds
set(time.localtime())
Thread(target=loop).start()
return stopped.set
now = readable(time.localtime(), start)
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
Note
The
loop
needs to be in its own thread, otherwise the function would never return and we would wait forever.
While there is no subscriber, the Readable will not be updated.
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
OhPleaseStop = now.subscribe(lambda x: print(time.strftime(f"%H:%M:%S", x), end="\r"))
22:12:41
time.sleep(2)
OhPleaseStop()
22:12:43
Note
The Svelte Store api allow you to create a Readable Store without a Notifier. See discussion here.
A Derived Store
stores a value based on the value of another store.
from sveltish.stores import derived
For example:
count = writable(1)
stopCount = count.subscribe(lambda x: print(f"count is {x}"))
double = derived(count, lambda x: x * 2)
stopDouble = double.subscribe(lambda x: print(f"double is {x}"))
test_eq(double.get(), 2*count.get())
count is 1
double is 2
count.set(2)
test_eq(double.get(), 4)
count is 2
double is 4
stopCount(), stopDouble()
(None, None)
Building on our previous example, we can create a store that derives the elapsed time since the original store was started.
elapsing = None
def calc_elapsed(now):
global elapsing
if not elapsing:
elapsing = now
return time.mktime(now) - time.mktime(elapsing)
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=43, tm_wday=2, tm_yday=67, tm_isdst=0)
elapsed = derived(now, lambda x: calc_elapsed(x))
elapsed
r<0> $float: 0.0
stopElapsed = elapsed.subscribe(lambda x: print(f"Elapsed time of source store: {x} seconds.", end="\r"))
Elapsed time of source store: 0.0 seconds.
time.sleep(1)
stopElapsed()
Elapsed time of source store: 2.0 seconds.
Derived stores allow us to transform the value of a store. In RxPy they
are called operators
. You can build several operators like: filter
,
fold
, map
, zip
…
Let’s build a custom filter
operator:
user = writable({"name": "John", "age": 32})
stopLog = user.subscribe(lambda x: print(f"User: {x}"))
User: {'name': 'John', 'age': 32}
name = derived(user, lambda x: x["name"])
stopName = name.subscribe(lambda x: print(f"Name: {x}"))
Name: John
user.update(lambda x: x | {"age": 45})
User: {'name': 'John', 'age': 45}
Updating the age does not trigger the name subscriber
. Let’s see what
happens when we update the name.
user.update(lambda x: x | {"name": "Fred"})
Name: Fred
User: {'name': 'Fred', 'age': 45}
Only changes to the name of the user triggers the name
subscriber.
stopName(), stopLog()
(None, None)
Another cool thing about Derived Stores is that you can derive from a
list of stores. Let’s build a zip
operator.
a = writable([1,2,3,4])
b = writable([5,6,7,8])
a,b
(w<0> $list: [1, 2, 3, 4], w<0> $list: [5, 6, 7, 8])
zipper = derived([a,b], lambda a,b: list(zip(a,b)))
test_eq(zipper.get(), [(1, 5), (2, 6), (3, 7), (4, 8)])
While zipper
has no subscribers, it keeps the initial value, it is
stopped
.
a.set([4,3,2,1])
test_eq(zipper.get(), [(1, 5), (2, 6), (3, 7), (4, 8)])
A subscription starts
zipper and it will start to react to the changes
of the stores.
u = zipper.subscribe(lambda x: None)
test_eq(zipper.get(), [(4, 5), (3, 6), (2, 7), (1, 8)])
b.set([8,7,6,5])
test_eq(zipper.get(), [(4, 8), (3, 7), (2, 6), (1, 5)])
u()
writable(1).pipe(lambda x: x + 1).pipe(lambda x: x * 2)
r<0> $int: 4
writable(1).pipe(lambda x: x+1, lambda x: x*2)
r<0> $int: 4
writable(1) | (lambda x: x+1) | (lambda x: x*2)
r<0> $int: 4
a = writable(1)
u5 = (a
| (lambda x: x*2)
| (lambda x: x*2)
| (lambda x: x*2)).subscribe(lambda x: print(f"u5: {x}"))
u5: 8
a.set(2)
u5: 16
u5()
You may have noticed that along the way we had always to subscribe and
then had to remember to unsubscribe when we were done. This is a bit of
a nuisance. Svelte has a compiler that provide some syntatic
sugar to make this
easier. They call it auto-subscriptions
.
Sveltish
does not have auto-subscriptions
yet. But if you have a
nice idea how to implement it, please let me know.