Service Example 4: Custom usage counters
Introduction
The example depicts the following features:
- Service execution
- Dedicated service runtime implemented with ya-runtime-sdk
- Custom usage counter
Full code of the example is available in the following locations:
- The requestor agent: https://github.com/golemfactory/yapapi/tree/master/examples/custom-usage-counter
- The custom runtime implementing a custom usage counter (using
ya-runtime-sdk
): https://github.com/golemfactory/ya-test-runtime-counters
Prerequisites
As with the other examples, we're assuming here you already have your yagna daemon set-up to request the test tasks and that you were able to configure your Python environment to run the examples using the latest version of yapapi
. If this is your first time using Golem and yapapi, please first refer to the resources linked above.
Overview
The application consists of two components:
Custom runtime
A dedicated, self-contained runtime created with ya-runtime-sdk. It is a Rust binary which illustrates the API used to update the value of a custom usage counter.
Requestor agent
A Python application developed using
yapapi
. It instantiates the runtime on a Provider and periodically fetches the current state of usage counters to demonstrate fetching of custom usage counter updates.
Custom runtime implementation
The implementation of the sample runtime with a custom usage counter can be found here: https://github.com/golemfactory/ya-test-runtime-counters/blob/main/src/main.rs.
The runtime includes a metric_reporter()
function which periodically increments a value of a counter named golem.usage.custom.counter
, then notifies the runtime about this new value via anEventEmitter
:
const COUNTER_NAME: &'static str = "golem.usage.custom.counter";
const INTERVAL: Duration = Duration::from_secs(2);
...
async fn metric_reporter(mut emitter: EventEmitter) {
let mut value = 10f64;
loop {
tokio::time::delay_for(INTERVAL).await;
value += 1f64;
emitter
.counter(RuntimeCounter {
name: COUNTER_NAME.to_string(),
value,
})
.await;
}
}
Note how RuntimeCounter
struct is used to pass the value of a counter using EventEmitter
's counter()
method. The RuntimeCounter instance will be propagated by the ExeUnit to the Provider agent (for the purposes of pricing and invoicing) and to Requestor side where it can be fetched by the Requestor agent's code.
The remaining code includes an implementation of the ExampleRuntime
, which is fairly simple, as it includes some non-void implementations of start()
and run_command()
actions.
In a real-world scenario, the custom counter should probably refer to some other aspect of the execution - e.g. occupied storage space or maybe the number of requests made to the service running on the provider's end.
That's because we already have two other counters that refer to time spent on execution available out of the box - one based on the wall clock time registering the time elapsed since the activity has been started (com.Counter.TIME
) and another based on the actual CPU execution time (com.Counter.CPU
).
start()
The start()
action launches a local thread running the metric_reporter()
passing anEventEmitter
instance cloned from runtime's Context
:
fn start<'a>(&mut self, ctx: &mut Context<Self>) -> OutputResponse<'a> {
let emitter = match ctx.emitter.clone() {
Some(emitter) => emitter,
None => {
let err = anyhow::anyhow!("not running in server mode");
return futures::future::err(err.into()).boxed_local();
}
};
let (handle, reg) = AbortHandle::new_pair();
tokio::task::spawn_local(Abortable::new(metric_reporter(emitter.clone()), reg));
self.handle = Some(handle);
...
}
This is then followed by a one-off, explicit initiation of the golem.usage.custom.counter
counter value.
run_command()
The run_command()
implements two explicitly named commands which can be triggered by calling a RUN <command> ExeScript call. The commands are:
sleep n
- forces the runtime to waitn
milliseconds.stop
- causes the runtime shutdown.
fn run_command<'a>(
&mut self,
command: RunProcess,
_mode: RuntimeMode,
ctx: &mut Context<Self>,
) -> ProcessIdResponse<'a> {
ctx.command(|mut run_ctx| {
async move {
match command.bin.as_str() {
"sleep" => {
let delay_str = command
.args
.get(1)
.ok_or_else(|| anyhow::anyhow!("Missing delay value"))?;
let delay_ms: u64 = delay_str.as_str().parse()?;
let delay = Duration::from_millis(delay_ms);
run_ctx
.stdout(format!("Entering sleep for {}ms", delay_ms))
.await;
tokio::time::delay_for(delay).await;
run_ctx.stdout("Done sleeping").await;
}
"stop" => {
run_ctx.stdout("Stopping runtime").await;
run_ctx.control().shutdown();
}
_ => {
anyhow::bail!("Unsupported command {} {:?}", command.bin, command.args);
}
}
Ok(())
}
.map_err(Into::into)
})
}
The example runtime implementation comes complete with ya-test-runtime-counters.json
config file, which includes metadata for the runtime, required to plug it into a yagna provider service, under the name test-counters
:
;[
{
name: 'test-counters',
version: '0.1.0',
'supervisor-path': 'exe-unit',
'runtime-path': 'ya-test-runtime-counters/ya-test-runtime-counters',
description:
'Yagna runtime supporting custom usage counters. For testing purposes only.',
'extra-args': ['--runtime-managed-image'],
config: {
counters: {
'golem.usage.custom.counter': {
name: 'Custom',
description: 'Custom counter',
price: true,
},
},
},
},
]
Note how config.counters
structure is used to specify the custom usage counter metadata.
Plugging the runtime into golemsp
In the $HOME/.local/lib/yagna/plugins/
directory create:
file
ya-test-runtime-counters.json
where you describe the plugin:;[ { name: 'test-counters', version: '0.1.0', 'supervisor-path': 'exe-unit', 'runtime-path': 'ya-test-runtime-counters/ya-test-runtime-counters', description: 'Custom usage counter example runtime', 'extra-args': ['--runtime-managed-image'], }, ]
directory
ya-test-runtime-counters
(compareruntime-path
in above file) whereya-test-runtime-counters
binary along with Erigon binaries are placed.
The new runtime also needs to be enabled in $HOME/.local/share/ya-provider/presets.json
. The preset object can be copied from other presets. Please note that exeunit-name
has to match to the name
property of the plugin above:
{
"active": [
"test-counters",
...
],
"presets": [
{
"name": "test-counters",
"exeunit-name": "test-counters",
"pricing-model": "linear",
"initial-price": 0,
"usage-coeffs": {
"golem.usage.duration_sec": 0.0001,
"golem.usage.cpu_sec": 0.0001,
"golem.usage.custom.counter": 0.0003
}
},
Note how in the example above the golem.usage.custom.counter
is to be included in the pricing function (linear model) with a coefficient of 0.0003 GLM.
Requestor agent
The requestor agent is a fairly simple implemenation of a Golem service which:
- Requires a
test-counters
runtime as payload:
@dataclass
class CustomCounterServicePayload(Payload):
runtime: str = constraint(inf.INF_RUNTIME_NAME, default="test-counters")
- In the
run()
handler, it periodically fetches the usage vector as published by the ExeUnit alongside the accumulated cost and displays it in console:
async def run(self):
start_time = datetime.now()
print(f"service {self.id} running on '{self.provider_name}'...")
while datetime.now() < start_time + timedelta(seconds=self._running_time_sec):
script = self._ctx.new_script()
script.run("sleep", "1000")
yield script
usage: ActivityUsage = await self._ctx.get_usage()
cost = await self._ctx.get_cost()
print(f"total cost so far: {cost}; activity usage: {usage.current_usage}")
await asyncio.sleep(3)
Launching the service
To launch the service, we first need to initialize Golem
:
async with Golem(
budget=10.0, subnet_tag=subnet_tag, driver=driver, network=network, strategy=strategy
) as golem:
instance_params = [{"running_time_sec": running_time_sec}]
Once created, we use it to instantiate a CustomCounterService
:
cluster = await golem.run_service(CustomCounterService, instance_params=instance_params)
And that's all there is to it. Running the example should result with a sequence of messages appearing in console, showing the golem.usage.custom.counter
counter being periodically incremented.
- The next article takes a close look at webapp example.
Was this helpful?