Skip to content

All about tasks

Tasks are the basis for scheduling your code, it’s the block that will be run each time it gets triggered. Tasks, of course, must have a certain basic structure that allows the system to hook into your code and run it properly. The basis is a class named JobConsumer that your tasks must inherit from.

here is the basic structure of the JobConsumer class exported as types

declare module "@jobConsumer/jobConsumer" {
import * as BrowserlessService from "@external/browserless";
import GotifyService from "@notifications/gotify";
import { JobDTO, JobLogDTO, JobOptions } from "@types/models/job";
import { exportCacheFiles, exportResultsToFile, getFromCache } from "@utils/jobUtils";
import type { AxiosInstance } from "axios";
import { IScheduleJob, IScheduleJobLog } from "schedule-manager";
const Consumer: typeof import("schedule-manager/dist/Classes/ScheduleJob/Consumer/JobConsumer").default;
export class JobConsumer extends Consumer {
axios: AxiosInstance;
options?: JobOptions;
notification: GotifyService;
browserless: typeof BrowserlessService;
onEnd?: (job: IScheduleJob, jobLog: IScheduleJobLog) => Promise<void>;
notificationServices: {
[key: string]: any;
};
constructor();
getFromCache(...args: Parameters<typeof getFromCache>): Promise<any>;
exportResultsToFile(...args: Parameters<typeof exportResultsToFile>): Promise<void>;
exportCacheFiles(...args: Parameters<typeof exportCacheFiles>): Promise<void>;
private initializeNotificationService;
injectProxies(): Promise<void>;
injectNotificationServices(services: number[]): Promise<void>;
logEvent(data: any, serializer?: (data: any) => any): void;
jobInputParse(job: JobDTO, jobLog: JobLogDTO): {
job: JobDTO;
jobLog: JobLogDTO;
};
preRun(j: JobDTO, jl: JobLogDTO): Promise<{
updateResult: {
success: boolean;
};
jobUpdateResult: {
success: boolean;
};
} | {
success: boolean;
err?: string;
}>;
complete(jobLog: IScheduleJobLog, result: any, error?: string): Promise<{
updateResult: {
success: boolean;
};
jobUpdateResult: {
success: boolean;
};
} | {
success: boolean;
err?: string;
}>;
run(job: IScheduleJob, jobLog: IScheduleJobLog): Promise<{
updateResult: {
success: boolean;
};
jobUpdateResult: {
success: boolean;
};
} | {
success: boolean;
err?: string;
}>;
on(jobName: string): void;
off(jobName: string): void;
serializeLogs(logsData: any, initialLevel?: number, currentLevel?: number): any;
}
}

The JobConsumer class has a host of methods and properties that the user’s code can use and other that are internally used, but can be modified if needed, more on that later. Let’s start by taking a look at the main methods that will be interacting with user’s code.

The run function is the task’s main function and trigger. it will be called by the scheduler on receiving the trigger event.

  • Params:
    • job: The job object from the database
    • jobLog: the instance object, represents the log for this run of the task
  • Returns (optional):
    • Promise(unionReturn): this is the same type as the complete function. although it’s optional, it’s highly recommended to return the result of the complete function at the end of the run function

The complete function is the task’s signal to the scheduler that it’s finished. there is no exact time or place in the task’s code that this should be called, but it’s highly recommended that it’s called once. This degree of freedom helps give more agency to the code to decide when it’s done.

When called, the complete function will determine the final status of the tasks (success, or failure) and will calculate and update the task’s runtime and average runtime

  • Params:
    • jobLog: the instance object, represents the log for this run of the task. it’s the same object that is passed to the runfunction
    • result: the result of the task, this can be any type, but it’s recommended to use a JSONable object, or a string
    • error: the error that occurred during the task, this can be any type, but it’s recommended to use a JSONable object or a string. if this is not null, the task will be marked as failed, even if a result is passed. so result and error are mutually exclusive
  • Returns (optional):
    • Promise(unionReturn): this is the same type as the run function needs to return. The complete function will return an object with a boolean attribute success, if this is set to false, this means that the task completion has failed and the function couldn’t update the task’s instance metrics and status, this issue should be handled by the task’s code.

This is the main logging function, you can use it to log single message or objects. the logs will be propagated to Loki with labels for the task and the task instance, that would help with granular extraction of logs. the logs, if enabled, will also be saved to files.

  • Params:
    • data: the data to log, this can be any type, but it’s recommended to use a JSONable object or a string

    • serializers: an optional function that will convert the data you passed into a string, if not passed the default serializer will be used. the default serializer handle multiple cases and will attempt to convert objects to strings until a certain depth is reached, the depth can only be passed to the default serializer function, so extending that or passing it manually when calling logEvent is the only way :

      this.logEvent(<nestedObject>, (m) => this.serializeLogs(m, <depthNumber>))
  • Returns:
    • void

this function is an async executor and takes no params, it will inject the proxies you attached to your task (via the API/ UI) into your http client (axios) instance. This means that the task’s code should decide on when to inject the proxies, once injected the proxies will be available for all calls. This might be updated to allow injection of specific proxies t a specific instance, …

this function takes no params and returns Promise(void)

A general function for exporting files from a running task. this will take a JSON object as input and save it to a file with the specified name. The record of the file export will also be saved to the database and attached to the running instance and the task itself. you can later preview, delete or download the file.

  • Params:

    • results: the data you want to save to the file, Must be a JSON object
    • fileName: the file name for the export. should not include extension
    • job_log_id: the task instance id, can be retrieved from the input job object passed to the run function
    • tags (optional): should be an array of strings that will be used to tag the file, comes in handy when searching through files
    • type (optional): The fil type, by default it’s json, but this is just an extra tag, for now, and it has no extra functionality attached to it
    • newFile (optional): if set to true, this will append a random number of the end of the file name to avoid overriding the same file name
  • Returns:

    • void

A general function to export limited timed files, files that will expire after a certain amount of time and will not be returned when reading it using the getFromCache function.

  • Params:
    • data: the data you want to save to the file, Must be a JSON object
    • fileName: the file name for the export. should not include extension
    • job_log_id: the task instance id, can be retrieved from the input job object passed to the run function
    • tags (optional): should be an array of strings that will be used to tag the file, comes in handy when searching through files
    • type (optional): The fil type, by default it’s json, but this is just an extra tag, for now, and it has no extra functionality attached to it
    • ttl (optional): the time to live for the file, in seconds, by default it’s 24h
    • newFile (optional): if set to true, this will append a random number of the end of the file name to avoid overriding the same file name

An async function to retrieve cached files, if the file has expired, this will resolve to a null value signaling that the cache is void. the function will return the file content only in a string format

  • Params:

    • fileName: the file name for the export. this MUST include the extension
  • Returns:

    • a promise that resolves to the file content as a string

An instance of axios http client, this is the main http client for the tasks to use when making http requests. this instance is also the default target of embedded proxy injection and any network metrics that are recorded by the system.

An object with a list of system options passed to the task, here is representation of the options object

JobOptions {
utils?: typeof JobConsumerUtils {
exportCacheFiles, // <-- This is redundant version of the same function above
exportResultsToFile, // <-- This is redundant version of the same function above
getNextJobExecution, // <-- This is a utility function to get the next cronjob execution time by passing the cronSetting
sleep, // <-- An async sleep function with the time to sleep for passed in milliseconds
}
config?: { [key: string]: any }; // <--This is the entire system configuration object, including but not limited to the .env configurations
// this includes has the DECRYPTED sensitive configuration values that are usually encrypted when saved to the database
}
  • The full configuration passed the options above is available on github config.ts file
  • getNextJobExecution def:
    • Params:
      • cronSetting: the cron setting for the job in string format
    • Returns:
      • Date object or null if the cron setting is not valid
  • sleep def:
    • Params:
      • seconds: the number of seconds to sleep for
    • Returns:
      • Promise(void)

An instance of the default notification service, right now we are still using Gotify as the default notification service. The notification service is a base interface that cna be used on custom services. The GotifyService implements this interface and implements extra methods to send notifications on task completion and crashes :

  • sendMessage: this is a generic send message method and would do a http call to the Gotify server, this WILL NOT save the notification to the database :

    • Params:
      • message: the message to send
      • title: the title of the message
    • Returns:
      • Promise(AxiosResponse)
  • sendBaseMessage this is a generic send-message method that will save the notification data to the database :

    • Params:
      • body: the message to send
        • title: the notification title
        • message: the notification message body
      • options: axios options to pass with teh http call (used for extra authentication, quick change of authentication keys, etc)
    • Returns:
      • Promise(AxiosResponse)
  • sendJobFinishNotification this method will send a job finish notification, this function IS NOT called automatically. Same reasoning as the complete method, so the task code needs to call this method:

    • Params:
      • jobId: The job id (from the job object passed to the run method)
      • jobName: The job name (from the job object passed to the run method),
      • results: The job result, this is manually set so it can be different from the complete method result and relies on the task code to handle,
      • options (optional): an object that cna be used to force an exact title and message and change the Gotify priority levels
    • Returns:
      • Promise(AxiosResponse)
  • sendJobCrashNotification same as the sendJobFinishNotification method above, this method will send a notification but about a job crash, the message and title are generated differently with clear mention that this is a crash. this method IS called automatically on crashes of the run method, the exact behaviour is defined in the preRun method definition

    • Params:
      • jobId: The job id (from the job object passed to the run method)
      • jobName: The job name (from the job object passed to the run method),
      • error: the error in a string format,
      • options (optional): an object that cna be used to force an exact title and message and change the Gotify priority levels
    • Returns:
      • Promise(AxiosResponse)

the browserless object is a set of functions that mimic a regular http client but with more capabilities all based on the features ofr the browserless project. This might be removed in favor of an external service implementation in the future.

this object exposes the following methods:

  • get this will act as a simple get request and will return the direct content of your target page (html, text, etc)

    • Params:
      • url: required, the url of the target web page
      • extraConfig: optional, extra misc configuration that will be passed to the browserless instance
        • params (optional): the url query params to pass along the main url
        • scrape (optional): when set to true, will trigger the scrape function of the browserless instance
        • elements (optional): an array of xPATH string, when scrape is set to true, the scraping will use this list of element.
    • Returns: AxiosResponse
      • the content of the target page as a string
  • getJson this function will read JSON content from a target webpage, and return it’s content as well as the response headers. this is done via the function feature of browserless. this is useful as this function will force the JSON content type when making the request.

    • Params:
      • url: required, the url of the target web page
      • extraConfig: optional, extra misc configuration that will be passed to the browserless instance
        • params (optional): the url query params to pass along the main url
        • scrape (optional): when set to true, will trigger the scrape function of the browserless instance
        • elements (optional): an array of xPATH string, when scrape is set to true, the scraping will use this list of element.
        • timeout (optional): the number timeout in milliseconds for the request
        • headers (optional): an object representing the headers to pass to the request to the target url
    • Returns: AxiosResponse
      • a json object :
        • headers: the object of the response headers
        • content: the JSON content of the target page

this object exposes the set of notification services that are attached to this job, the keys represent the names set for the service on creation and the values are the service instances. for example, if you attach the default service gotify to the job via the API or the UI notification page, it will be available in this object as follows :

notificationServices = {
'gotify': gotifyServiceInstance
}

The services are initialized before the run command is called, any failure in initialization of the attached services, will result in the task crashing.