Using Nuclio
Nuclio is an open-source and managed serverless platform used to minimize development and maintenance overhead and automate the deployment of data-science-based applications. The framework focused on data, I/O, and compute-intensive workloads. It is well integrated with popular data science tools, such as Jupyter and Kubeflow, supports a variety of data and streaming sources, and supports execution over CPUs and GPUs.
You can use Nuclio through a fully managed application service, the Lyve cloud analytics platform. MLRun serving utilizes serverless Nuclio functions to create multi-stage real-time pipelines.
Nuclio addresses the desired capabilities of a serverless framework:
Real-time processing with minimal CPU/GPU and I/O overhead and maximum parallelism
Native integration with a large variety of data sources, triggers, processing models, and ML frameworks
Stateful functions with data-path acceleration
rabbit-mq from mlrun
exchangeName: The exchange that contains the queue
topics and queueName: They are mutually exclusive. The trigger can either create an existing queue specified by queueName or create its queue, subscribing it to topics.
topics: If you Specify the trigger, create a queue with a unique name and subscribe it to these topics.
url: It is the actual host URL and port details where queue address details are available.
import mlrun import json class Echo: def __init__(self, context, name=None, **kw): self.context = context self.name = name self.kw = kw def do(self,x): y = type(x) print("Echo:", "done consuming", y) return x function = mlrun.code_to_function("rabbit2",kind="serving", image="mlrun/mlrun") trigger_spec={"kind":"rabbit-mq", "maxWorkers": 1, "url": "amqp://athena-spr-kpiv-prod-admin:46f08c5d998e@spr-athena-rabbitmq.stni.seagate.com:5672/athena-spr-kpiv-prod-01", "attributes":{ "exchangeName": "athena.spr.topic.inference", "queueName": "athena-spr-sampling-shadow-validate", "topic": "#.sampling.#"}, } function.add_trigger("rabbitmq",trigger_spec) graph = function.set_topology("flow", engine="async") graph.to(class_name="Echo", name="testingRabbit") function.deploy()