Skip to main content

First Collektive Program

Device Counter

Let's create a program where each device learns the total number of devices in the network.

We can use the countDevices function from the standard library, which relies on a hopDistance metric. This function is based on the convergeCast operation, which aggregates values towards the root device in the network.

/**
* Count the number of devices in the network.
* The total is accumulated in the device with [localId] 0 (the [sink]).
* Other devices hold the number of devices in their subtree towards the network edge, inlcuding themselves.
* A leaf node (with no outward neighbors) will hold 1.
*/
fun Aggregate<Int>.countDevicesEntrypoint(): Int = countDevices(sink = localId == 0)
note

With our goal in mind, this implementation has a limitation: only the root device will know the number of devices in the network.

We now need to propagate the information accumulated by the root device to all other devices in the network. How? By broadcasting the value with the gradientCast function, which propagates it across multiple spanning trees, starting from the root.

/**
* Broadcast from the [source] the number of devices connected to the network.
* Requires a [distances] field to be used as metric for computing the distance from the source to the target.
*/
fun Aggregate<Int>.broadcastCountDevices(distances: Field<Int, Double>, source: Boolean): Int =
gradientCast(
metric = distances,
source = source,
local = countDevices(sink = source),
)

The following is the program that each device will execute:

/**
* [collektiveDevice] represents the device running the Collektive program.
* It is used to access the device's properties and methods,
* such as the [distances] method, which returns a field of distances from the neighboring nodes.
* In this case, the source is the device with [localId] equal to 0.
*/
fun Aggregate<Int>.broadcastCountDevicesEntrypoint(collektiveDevice: CollektiveDevice<*>): Int =
broadcastCountDevices(
distances = with(collektiveDevice) { distances() },
source = localId == 0,
)

Now all the devices in the network will know how many devices are currently connected.

caution

What happens if a group of devices or the root gets disconnected from the network, or the network becomes segmented?

In such a case, the subnetwork that still contains the root device will continue to compute the updated number of devices. However, any subnetworks that no longer include the root will not be able to do so.

How can we address this issue? We can use a leader election algorithm, which allows us to dynamically elect a new root device within each connected component of the network. In this tutorial, we use the boundedElection function from the standard library.

/**
* Counts and broadcast the number of devices in the network using a leader election.
* If the network is segmented, each connected component elects a leader to act as root.
*/
fun Aggregate<Int>.broadcastCountDevicesWithLeaderElectionEntrypoint(
collektiveDevice: CollektiveDevice<*>,
): Int {
val leaderId = boundedElection(bound)
return broadcastDevices(
distances = with(collektiveDevice) { distances() },
source = localId == leaderId,
)
}
Success

Now we're all set! The final program will recompute the number of devices in the network, even if the network becomes segmented.

info

The code above can be found and executed in the collektive examples repository.