Notes while reading Leaf-0.3 code
Current goals:
to see how the ULID (stamp) can be sent to the client when connecting to a space
to see how the server handles creating a space
to try to run the leaf server locally
to see how space membership (and other state) is tracked on the leaf server
leaf-stream
A StreamGenesis seems a bit like a builder: it exists before a Stream does, and is hashed to produce the Stream ID. The ULID is essentially functioning as a nonce/IV.
/// The genesis configuration of an event stream. #[derive(Encode, Decode, Debug)] pub struct StreamGenesis { /// A ULID, which encompasses the timestamp and additional randomness, included in this stream /// to make it's hash unique. /// /// Note that this is not the stream ID, which is computed fro the hash of the /// [`GenesisStreamConfig`]. pub stamp: Encodable<Ulid>, /// User ID of the user that created the stream. pub creator: String, /// The hash of the WASM module that will be used for filtering. pub module: Encodable<Hash>, /// The parameters to supply to the WASM module. pub params: Vec<u8>, }
We want the ULID to become accessible by users on the client. It could be that the ULID is generated by the client, like other entities are. This would mean that clients can create and begin using spaces while completely offline. If they can't generate an entity ID themselves, they don't have anything stable with which to refer to it.
On the other hand, it's possible that creating a space has more overhead on the server than other events, so it's worth thinking about how we might rate limit server creation to prevent abuse, particularly if clients are empowered to potentially create a million spaces offline.
Let's just focus on getting the ULID out for now.
How does the server serve streams?
I know leaf-streams is used by leaf-server, but the exact way streams are served isn't clear yet. leaf-server/src/http.rs has a Router — it seems to be using something called salvo, which is a web framework a bit like axum, which supports WebSockets — but how does SocketIO come into it?
let (layer, io) = SocketIo::builder() .with_parser(ParserConfig::msgpack()) .build_layer(); // This code is used to integrates other tower layers before or after Socket.IO such as CORS // Beware that classic salvo request won't pass through these layers let layer = ServiceBuilder::new() .layer(CorsLayer::permissive()) // Enable CORS policy .layer(layer); // Mount Socket.IO // TODO: add richer request information to tracing. let router = Router::new() .push(Router::with_path(".well-known/did.json").get(did_endpoint)) .push(Router::with_path("/socket.io").goal(layer.compat())) .push(Router::with_path("/xrpc/space.roomy.token.v0").post(token_endpoint)) .push(Router::new().get(http_index)) .hoop(Logger::new()); let server = Server::new(acceptor); let handle = server.handle(); io.ns("/", socket_io_connection); tokio::spawn(async move { EXIT_SIGNAL.wait_for_exit_signal().await; handle.stop_graceful(Some(Duration::from_secs(5))); });
So io.ns("/", socket_io_connection); seems to be the critical point here. We're using socketioxide (gh) here. Ok, a bit confusing, it seems that SocketIo::builder() should return a struct, not a tuple. Oh wait, no, I didn't see the chained methods on the next lines. It configures a parser then builds it. build_layer() apparently returns "a SocketIoLayer and a SocketIo instance that can be used as a tower_layer::Layer". What is a layer?
Often, many of the pieces needed for writing network applications can be reused across multiple services. The Layer trait can be used to write reusable components that can be applied to very different kinds of services; for example, it can be applied to services operating on different protocols, and to both the client and server side of a network transaction. (tower-layer)
Sooo... basically it seems like salvo is handling some TCP connections, while the "/" route is being handled by socketioxide, which calls socket_io_connection.
That function authenticates by matching any valid auth token to a DID, then calls connection::setup_socket_handlers(&socket, did); This one is where the meat of the socket handling happens.
We first keep track of open streams and 'unsubscribers', which I am guessing is callbacks for cleaning up when socket connections are closed.
let open_streams = Arc::new(RwLock::new(HashMap::new())); let unsubscribers = Arc::new(Mutex::new(HashMap::new()));
Then we call socket.on() with handlers for 8 different event types:
wasm/upload
wasm/has
stream/create
stream/event
stream/event_batch
stream/subscribe
stream/unsubscribe
stream/fetch
Every event type has a corresponding struct, which describes the expected shape of data for that event. The handler parses the event content as that struct. For example:
#[derive(Deserialize)] struct StreamCreateArgs { /// The hex-encoded, blake3 hash of the WASM module to use to create the stream. module: String, params: Vec<u8>, }
So I know that the WASM module is meant to be a filter for incoming events, or something like that. What's params though? Here we go:
socket.on( "stream/create", async move |TryData::<StreamCreateArgs>(data), ack: AckSender| { let result = async { // Create the stream let input = data?; let genesis = StreamGenesis { stamp: Ulid::new().into(), creator: did_.clone(), module: Hash::from_hex(input.module)?.into(), params: input.params, }; let stream_id = STORAGE.create_stream(genesis).await?; anyhow::Ok(stream_id) } .instrument(tracing::info_span!(parent: span_.clone(), "handle stream/create")) .await; match result { Ok(stream) => { let id = stream.id(); open_streams_.write().await.insert(id, stream); ack.send(&json!({ "streamId": id.to_hex().as_str() })) } Err(e) => ack.send(&json!({ "error": e.to_string()})), } .log_error("Internal error sending response") .ok(); }, );
So we still don't know what params is, but we know it gets passed into StreamGenesis. Oh wait, we already know from the start of this post, it's "The parameters to supply to the WASM module". Ok! Not going to worry about it for now!
I want to know what gets sent back. It looks like open_streams_.write().await.insert(id, stream); essentially persists the event to the stream, then ack.send(&json!({ "streamId": id.to_hex().as_str() })) is the JSON acknowledgement/response, including the streamId. Maybe this is where we can send the ULID as well!
Soo for this part, all we should need to do is update StreamCreateArgs to include a ULID:
#[derive(Deserialize)] struct StreamCreateArgs { /// The hex-encoded, blake3 hash of the WASM module to use to create the stream. module: String, params: Vec<u8>, ulid: [u8; 16], }
Then update the handler to use it:
let genesis = StreamGenesis { stamp: Ulid::from_bytes(input.ulid).into(), creator: did_, module: Hash::from_hex(input.module)?.into(), params: input.params, };