Example

JS Task API Examples: composing tasks

Introduction

Task Executor methods take a task function as a parameter for each of its methods. This function is asynchronous and provides access to the ExeUnit object, which is provided as one of its parameters.

A task function may be very simple, consisting of a single command, or it may consist of a set of steps that include running commands or sending data to and from providers.

Commands can be run in sequence or can be chained in batches. Depending on how you define your batch, you can obtain results of different types.

The following commands are currently available:

CommandAvailable in node.jsAvailable in web browser
run()yesyes
runAndStream()yesyes
uploadFile()yesno
uploadJson()yesyes
downloadFile()yesno
uploadData()yesyes
downloadData()noyes
downloadJson()noyes
info

This article focuses on the run(), runAndStream() commands and chaining commands using the beginBatch() method. Examples for the uploadFile(), uploadJSON(), downloadFile() commands can be found in the Transferring Data article.

We'll start with a simple example featuring a single run() command. Then, we'll focus on organizing a more complex task that requires a series of steps:

  • send a worker.js script to the provider (this is a simple script that prints "Good morning Golem!" in the terminal),
  • run the worker.js on a provider and save the output to a file (output.txt) and finally
  • download the output.txt file back to your computer.

Prerequisites

Yagna service is installed and running with the try_golem app-key configured.

How to run examples

Create a project folder, initialize a Node.js project, and install libraries.

mkdir golem-example
cd golem-example
npm init
npm i @golem-sdk/task-executor
npm i @golem-sdk/pino-logger

Copy the code into the index.mjs file in the project folder and run:

node index.mjs

Some of the examples require a simple worker.mjs script that can be created with the following command:

echo console.log("Hello Golem World!"); > worker.mjs

Running a single command

Below is an example of a simple script that remotely executes node -v.

import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async () => {
  const executor = await TaskExecutor.create({
    logger: pinoPrettyLogger({ level: "info" }),
    api: { key: "try_golem" },
    demand: {
      workload: {
        imageTag: "golem/node:20-alpine",
      },
    },
    market: {
      rentHours: 0.5,
      pricing: {
        model: "linear",
        maxStartPrice: 0.5,
        maxCpuPerHourPrice: 1.0,
        maxEnvPerHourPrice: 0.5,
      },
    },
  });

  try {
    const result = await executor.run(async (exe) => (await exe.run("node -v")).stdout);
    console.log("Task result:", result);
  } catch (err) {
    console.error("An error occurred:", err);
  } finally {
    await executor.shutdown();
  }
})();

Note that exe.run() accepts a string as an argument. This string is a command invocation, executed exactly as one would do in the console. The command will be run in the folder defined by the WORKDIR entry in your image definition.

Running multiple commands (prosaic way)

Your task function can consist of multiple steps, all run on the exe exeUnit.

import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async () => {
  const executor = await TaskExecutor.create({
    logger: pinoPrettyLogger({ level: "info" }),
    api: { key: "try_golem" },
    demand: {
      workload: {
        imageTag: "golem/node:20-alpine",
      },
    },
    market: {
      rentHours: 0.5,
      pricing: {
        model: "linear",
        maxStartPrice: 0.5,
        maxCpuPerHourPrice: 1.0,
        maxEnvPerHourPrice: 0.5,
      },
    },
  });

  try {
    const result = await executor.run(async (exe) => {
      await exe.uploadFile("./worker.mjs", "/golem/input/worker.mjs");
      await exe.run("node /golem/input/worker.mjs > /golem/input/output.txt");
      const result = await exe.run("cat /golem/input/output.txt");
      await exe.downloadFile("/golem/input/output.txt", "./output.txt");
      return result.stdout;
    });

    console.log(result);
  } catch (err) {
    console.error("An error occurred:", err);
  } finally {
    await executor.shutdown();
  }
})();

To ensure the proper sequence of execution, all calls must be awaited. We only handle the result of the second run() and ignore the others.

info

If you use this approach, each command is sent separately to the provider and then executed.

Multiple Commands output log Multiple Commands output log

Organizing commands into batches

Now, let's take a look at how you can arrange multiple commands into batches. Depending on how you finalize your batch, you will obtain either:

  • an array of result objects or
  • Observable rxjs

Organizing commands into a batch resulting in a Promise of array of results

Use the beginBatch() method and chain commands followed by .end().

import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async () => {
  const executor = await TaskExecutor.create({
    logger: pinoPrettyLogger(),
    api: { key: "try_golem" },
    demand: {
      workload: {
        imageTag: "golem/node:20-alpine",
      },
    },
    market: {
      rentHours: 0.5,
      pricing: {
        model: "linear",
        maxStartPrice: 0.5,
        maxCpuPerHourPrice: 1.0,
        maxEnvPerHourPrice: 0.5,
      },
    },
  });

  try {
    const result = await executor.run(async (exe) => {
      return (
        await exe
          .beginBatch()
          .uploadFile("./worker.mjs", "/golem/input/worker.mjs")
          .run("node /golem/input/worker.mjs > /golem/input/output.txt")
          .run("cat /golem/input/output.txt")
          .downloadFile("/golem/input/output.txt", "./output.txt")
          .end()
      )[2]?.stdout;
    });

    console.log(result);
  } catch (error) {
    console.error("Computation failed:", error);
  } finally {
    await executor.shutdown();
  }
})();

All commands after .beginBatch() are run in a sequence. The chain is terminated with .end(). The output is a Promise of an array of result objects. They are stored at indices according to their position in the command chain (the first command after beginBatch() has an index of 0).

The output of the 3rd command, run('cat /golem/input/output.txt'), is under the index of 2.

Commands batch end output logs Commands batch end output logs

Organizing commands into a batch producing an Observable

To produce an Observable, use the beginBatch() method and chain commands, followed by endStream().

import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async () => {
  const executor = await TaskExecutor.create({
    logger: pinoPrettyLogger({ level: "info" }),
    api: { key: "try_golem" },
    demand: {
      workload: {
        imageTag: "golem/node:20-alpine",
      },
    },
    market: {
      rentHours: 0.5,
      pricing: {
        model: "linear",
        maxStartPrice: 0.5,
        maxCpuPerHourPrice: 1.0,
        maxEnvPerHourPrice: 0.5,
      },
    },
  });

  try {
    const result = await executor.run(async (exe) => {
      const res = await exe
        .beginBatch()
        .uploadFile("./worker.mjs", "/golem/input/worker.mjs")
        .run("node /golem/input/worker.mjs > /golem/input/output.txt")
        .run("cat /golem/input/output.txt")
        .downloadFile("/golem/input/output.txt", "./output.txt")
        .endStream();

      return new Promise((resolve) => {
        res.subscribe({
          next: (result) => console.log(result),
          error: (error) => console.error(error),
          complete: () => resolve(),
        });
      });
    });
  } catch (error) {
    console.error("Computation failed:", error);
  } finally {
    await executor.shutdown();
  }
})();

Note that in this case, as the chain ends with .endStream(), we can read data chunks from Observable, denoted as res.

Once the stream is completed, we can terminate our TaskExecutor instance.

Commands batch endstream output logs Commands batch endstream output logs Commands batch endstream output logs

Running commands and collecting output as a stream

Here are two examples of how to run a command and collect its output as a stream.

Basic runAndStream scenario

In the first example, we run a command that produces both stdout and stderr outputs that we pass to the console. This command will terminate on its own after ten cycles.

import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async function main() {
  const executor = await TaskExecutor.create({
    logger: pinoPrettyLogger(),
    api: { key: "try_golem" },
    demand: {
      workload: {
        // What do you want to run
        imageTag: "golem/alpine:latest",
      },
    },
    market: {
      rentHours: 0.5,
      pricing: {
        model: "linear",
        maxStartPrice: 0.5,
        maxCpuPerHourPrice: 1.0,
        maxEnvPerHourPrice: 0.5,
      },
    },
    task: {
      // Control the execution of tasks
      taskTimeout: 5 * 60 * 1000,
    },
  });

  try {
    let result = await executor.run(async (exe) => {
      console.log("Provider deployed");

      await exe.run(
        `echo 'counter=0; while [ $counter -lt 10 ]; do ls ./home non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
      );

      await exe.run("chmod 700 ./script.sh");

      let remoteProcess = await exe.runAndStream("/bin/sh ./script.sh");

      remoteProcess.stderr.subscribe((data) => console.error("stderr: ", data));

      await new Promise((resolve) => {
        remoteProcess.stdout.subscribe({
          next: (data) => console.log("stdout: ", data),
          complete: () => resolve(),
        });
      });
    });
    console.log(result);
  } catch (err) {
    console.error("Running the task on Golem failed due to", err);
  } finally {
    await executor.shutdown();
  }
})();

runAndStream scenario with timeout defined

In this example, we show how to use remoteProcess.waitForExit() to terminate the process. Note that in the current implementation, the exit caused by timeout will terminate the activity on a provider, therefore the user cannot run another command on the provider. The task executor will instead run the next task on another provider.

import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

const executor = await TaskExecutor.create({
  logger: pinoPrettyLogger(),
  api: { key: "try_golem" },
  demand: {
    workload: {
      imageTag: "golem/alpine:latest",
    },
  },
  market: {
    rentHours: 0.5,
    pricing: {
      model: "linear",
      maxStartPrice: 0.5,
      maxCpuPerHourPrice: 1.0,
      maxEnvPerHourPrice: 0.5,
    },
  },
  task: {
    maxParallelTasks: 1,
  },
});

// the example will run a tasks 4 times, in sequence (as maxParallelTasks is 1)
for (const i of [1, 2, 3, 4]) {
  await executor
    .run(async (exe) => {
      // each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay

      // the command generating the sequence is saved to script.sh file
      await exe.run(
        `echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
      );
      // permissions are modified to be able to run the script
      await exe.run("chmod 700 ./script.sh");

      // script is run and stream results, stdout and stderr are processed
      let remoteProcess = await exe.runAndStream("/bin/sh ./script.sh");

      remoteProcess.stdout.subscribe((data) => console.log(`iteration: ${i}:`, "stdout>", data));
      remoteProcess.stderr.subscribe((data) => console.error(`iteration: ${i}:`, "stderr>", data));

      // For odd tasks, we set streaming timeout to 10 secs,
      // the script will end normally, for equal tasks we will exit the run method after 3 secs.
      // The exit caused by timeout will terminate the activity on a provider,
      // therefore the user cannot run another command on the provider.
      // Task executor will run the next task on another provider.

      const timeout = i % 2 === 0 ? 3_000 : 10_000;
      const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
        console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${exe.provider.name}`);
        exe
          .run("ls -l")
          .catch((e) =>
            console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
          );
      });
      if (finalResult) {
        // if the spawn exited without timeout, the provider is still available
        console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${exe.provider.name}`);

        console.log("Running command after normal runAndStream exit is possible:", (await exe.run("ls -l")).stdout);
      }
    })
    .catch((error) => console.error("Execution of task failed due to error.", error));
}

await executor.shutdown();

Was this helpful?