Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JavaScript processor unable to handle asynchronous code execution #2623

Open
lunacrafts opened this issue May 31, 2024 · 1 comment
Open
Labels
enhancement processors Any tasks or issues relating specifically to processors

Comments

@lunacrafts
Copy link

I am encountering an issue with the JavaScript processor's inability to handle asynchronous code execution. The following example demonstrates the problem:

    this.pipeline = new k8s.helm.v3.Release(
      namespace.get('pipeline'),
      {
        namespace: args.namespace,
        name: args.metadata.name,
        createNamespace: true,
        repositoryOpts: {
          repo: 'https://benthosdev.github.io/charts/',
        },
        chart: 'benthos',
        version: '2.2.0',
        values: {
          config: {
            input: {
              generate: {
                interval: '1s',
                count: 0,
                mapping: `
                  root.id = uuid_v4()
                  root.message = "Heyxx!!!"
                  root.timestamp = now()
                `
              }
            },
            pipeline: {
              processors: [
                {
                  label: 'asyncjs',
                  javascript: {
                    code: `(async () => await fetch('http://xxx') )()`
                  }
                }
              ]
            },
            output: {
              stdout: {
                codec: 'lines'
              }
            }
          }
        }
      },
      {
        parent: this,
      },
    );

It appears that the RunProgram method, currently employed for executing JavaScript, does not support the emulation of an event loop required for asynchronous operations. To properly support asynchronous code, an event loop should be integrated.

Here is the relevant code in the processor:
https://github.com/redpanda-data/connect/blob/main/internal/impl/javascript/vm.go#L133

func (r *vmRunner) Run(ctx context.Context, batch service.MessageBatch) (service.MessageBatch, error) {
    defer r.reset()

    var newBatch service.MessageBatch
    for i := range batch {
        r.reset()
        r.runBatch = batch
        r.targetIndex = i
        r.targetMessage = batch[i]

        _, err := r.vm.RunProgram(r.p)
        if err != nil {
            // TODO: Make this more granular, error could be message specific
            return nil, err
        }
        if newMsg := r.targetMessage; newMsg != nil {
            newBatch = append(newBatch, newMsg)
        }
    }
    return newBatch, nil
}

An event loop should, I assume, be integrated as illustrated in the following example from the node engine tests:
https://github.com/dop251/goja_nodejs/blob/master/eventloop/eventloop_test.go#L467

// Example from node engine with eventloop
func TestPromise(t *testing.T) {
    t.Parallel()
    const SCRIPT = `
    let result;
    const p = new Promise((resolve, reject) => {
        setTimeout(() => {resolve("passed")}, 500);
    });
    p.then(value => {
        result = value;
    });
    `

    loop := NewEventLoop()
    prg, err := goja.Compile("main.js", SCRIPT, false)
    if err != nil {
        t.Fatal(err)
    }
    loop.Run(func(vm *goja.Runtime) {
        _, err = vm.RunProgram(prg)
    })
    if err != nil {
        t.Fatal(err)
    }
    loop.Run(func(vm *goja.Runtime) {
        result := vm.Get("result")
        if !result.SameAs(vm.ToValue("passed")) {
            err = fmt.Errorf("unexpected result: %v", result)
        }
    })
    if err != nil {
        t.Fatal(err)
    }
}
@mihaitodor mihaitodor added enhancement processors Any tasks or issues relating specifically to processors labels May 31, 2024
@lunacrafts
Copy link
Author

I kept digging for a workaround as JavaScript bindings are a must-have in my case. I have created a fork of the repository which can be found here: mirrorboards-forks/connect.

I have changed the base Docker image to oven/bun:debian to support JavaScript evaluation from the container level. You can see the change in this commit: cb10dcbb1c6b4f593dcf69fc663102af4fe646bb. The image can be found under this address: ghcr.io/mirrorboards-forks/connect.

With this change, I was able to successfully run asynchronous scripts (screenshots below):

First script, using JavaScript Promise & setTimeout:

const foo = async () => {
  await new Promise((resolve) => setTimeout(resolve, 5000)); 
  console.log({ value: 'awaited response', date: +new Date() });
};

await foo();

Second script, using native fetch

const foo = async () => {
  const response = await fetch('https://pokeapi.co/api/v2/pokemon/ditto');
  console.log(response);
};

await foo();

If you decide to go with a similar approach, I'd suggest using Bun or a similar runtime. Currently, Bun-like runtimes (such as tsx) are becoming standard in the Node ecosystem and are supported in official Node.js via the --loader flag. Bun is significantly faster and handles bundling on the fly, including both CommonJS and ESM styles.

import * as pulumi from '@pulumi/pulumi';
import * as k8s from '@pulumi/kubernetes';
import { Namespace } from '@mirrorboards/namespace';

type BenthosPipelineArgs = {
  metadata: {
    name: string
    namespace: string
  }
};

export class BenthosPipeline extends pulumi.ComponentResource {
  public readonly pipeline: k8s.helm.v3.Release;

  constructor(name: string, args: BenthosPipelineArgs, opts?: pulumi.ComponentResourceOptions) {
    super('mirrorboards:processing:benthos:BenthosPipeline', name, args, opts);

    const namespace = new Namespace(args.metadata.namespace);

    const script = `const foo = async () => { await new Promise((resolve) => setTimeout(resolve, 5000)); console.log({ value: 'awaited response', date: +new Date() }); }; await foo();`;
    const scriptWithFetch = `const foo = async () => { const response = await fetch('https://pokeapi.co/api/v2/pokemon/ditto'); console.log(response); }; await foo();`;

    this.pipeline = new k8s.helm.v3.Release(
      namespace.get('pipeline'),
      {
        namespace: namespace.get(),
        name: args.metadata.name,
        createNamespace: true,
        repositoryOpts: {
          repo: 'https://benthosdev.github.io/charts/',
        },
        chart: 'benthos',
        version: '2.2.0',
        values: {
          image: {
            repository: 'ghcr.io/mirrorboards-forks/connect',
            pullPolicy: "IfNotPresent",
            tag: "4.28.3"
          },
          config: {
            input: {
              generate: {
                interval: '1s',
                count: 0,
                mapping: `
                  root.id = uuid_v4()
                  root.message = "Heyxx!!!"
                  root.timestamp = now()
                `
              }
            },
            pipeline: {
              processors: [
                {
                  type: "command",
                  command: {
                    name: "bun",
                    args_mapping: `["-e","${scriptWithFetch}"]`
                  },
                }
              ]
            },
            output: {
              stdout: {
                codec: 'lines'
              }
            }
          }
        }
      },
      {
        parent: this,
      },
    );

    this.registerOutputs({
      pipeline: this.pipeline,
    });

    this.registerOutputs({});
  }
}

Screenshot 2024-06-02 at 21 44 05

Screenshot 2024-06-02 at 21 44 34

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement processors Any tasks or issues relating specifically to processors
Projects
None yet
Development

No branches or pull requests

2 participants