Data Sharing Between Apps

Interop

JavaScript

Method Registration

Registering Methods

The Interop API can be accessed through glue.interop.

See our JavaScript Interop example on GitHub.

To offer a method to other applications, call glue.interop.register() and pass the definition of the method and a callback to handle invocations from clients.

Method signature:

register(definition: string | MethodDefinition, handler: (args: object, caller: Instance) => object | void): void;

Where definition is a string representing the method name, or an object holding the method name, signature and other properties of the method (see below), while handler is the JavaScript function that will be called when the method is invoked.

Example:

glue.interop.register({
    name: "Sum", // required - method name 
    accepts: "int a, int b", // optional - parameters signature
    returns: "int answer" // optional - result signature
  },
  (args) => {   // required - handler function
      return { answer: args.a + args.b }; 
      }
);

Once a method is registered, it can be invoked from any app (web or native) that is Glue42 enabled.

Method Definition

The method definition is an object describing the Interop method your application is offering. It has the following properties:

Property Description
name Required. The name of the method, e.g. OpenClientPerformance()
accepts Signature, describing the parameters that the method expects (see below)
returns Signature, describing the properties of the object the method returns
displayName The actual name of the method that should be used in UI applications, e.g. "Open Client Performance"
description Description of what the method does, useful for documentation purposes and for UI clients, e.g. "Launches or activates the Client Performance application"
objectTypes The entities this method is meant to work on, e.g. a party (client, prospect, lead), instrument, order, etc. (more on this in the Object Types section)

Your method can be called with arguments, even if you don't specify a parameter signature. The Interop library does not enforce restrictions on the number and types of the arguments. It is similar to JavaScript and the purpose of that is for you to be able to easily upgrade a method by adding one or more optional parameters. Your app will support both the apps with the new parameter configuration, as well as the ones with the old configuration.
If your method doesn't need to return anything, you don't need to define its result signature (the returns property).
It is a good idea to always specify displayName and description in the method definition. These can be used by a generic UI (such as Interop Viewer, or by your own applications. In the Object Types section you can see an example of this.

Method Signature

The signature of a method is a comma-delimited string of parameters, defined as follows:

type [array_modifier] [optional_modifier] parameter_name [composite_schema] [description]

Where type is one of: Bool, Int, Long, Double, String, DateTime, Composite ("Composite" is explained below) and is case-insensitive, so bool and BOOL are the same thing.

Examples:

Signature Explanation
String pId, String? dynamicsId pId is required, dynamicsId is optional
String branchCode, String[] gIds branchCode and gIds are required, gIds is an array of strings
Composite: { String first, String last } name name is a composite parameter, and its schema is defined by 2 required strings - first and last

Composite is a structure which contains one or more fields of type:

  • scalar (bool, int, etc.)
  • array of scalars
  • a composite (nested structure)
  • an array of composites

Using Composite you can define almost any non-recursive, non-self-referential structure.

Returning Results

If your method returns a result, make sure it returns an object with the result as a key (you can return multiple results), otherwise it won't work:

Correct:

  const sum = (args) => {
    return { answer: args.a + args.b }; // object
  };

Wrong:

  const sum = (args) => args.a + args.b; // not an object

Asynchronous Results

Sometimes you might need to return the result of a method asynchronously, e.g. your code needs to invoke a web service and wait for the result before returning. You can use glue.interop.register() to register asynchronous methods as well:

glueServer.interop.register(myAsyncMethod, async () => {

    const response = await fetch("https://docs.glue42.com");

    if (response.ok) {
        return 42;
    } else {
        throw new Error("The doc site is down!");
    };
});

Method Invocation

Invoking Methods

You can invoke methods of a specified instance, an arbitrary instance or many instances. To invoke a method, offered by another application, call glue.interop.invoke() and pass the name of the method, its arguments as an object, and optionally provide a target and invocation options. Use then() on the returned Promise, or pass success/error callbacks to retrieve the result of the invocation.

The method signature is:

 invoke(method: string | MethodDefinition,
  argumentObj?: object,
  target?: string | Instance | Instance[],
  options?: InvokeOptions,
  success?: InvokeSuccessHandler<any>,
  error?: InvokeErrorHandler): Promise<InvocationResult<any>>;
  • arguments - a plain JavaScript object (or JSON) holding key/value pairs passed as named arguments to the handler of the registered Interop method.

  • target - can be one of:

    • best - executes the method on the best (or first) server, effectively leaving the Interop runtime to select the appropriate server;
    • all - executes the method on all servers offering it;
    • skipMine - like all, but skips the current server;
    • Interop instance (or a subset, used for filtering), e.g. { application: "appName" };
    • an array of Interop instances/filters;

The properties of the Interop instance can have both a string or a regular expression as a value.

  • InvokeOptions - an object with optional properties:
    • waitTimeoutMs (in ms, default is 30 secs) - timeout to discover the method, if not immediately available
    • methodResponseTimeoutMs (in ms, default is 30 secs) - timeout to wait for a method reply

The returned Promise resolves with an InvocationResult object from which you can access the result from invoking the method by using the returned property of the InvocationResult object:

Example:

glue.interop.invoke(
    "Sum",
    { a: 37, b: 5 }) // everything else is optional
    .then(successResult => {
        console.log(`37 + 5 = ${successResult.returned.answer}`);
    })
    .catch(err => {
        console.error(`Failed to execute Sum ${err.message}`);
    });

Here is an example of a method that will fail, because it is not available, and also an example of controlling the timeouts:

glue.interop.invoke(
    "NotExisting",
    {},            // no args
    undefined,     // default target (see below)
    {              // options (optional)
        waitTimeoutMs: 10000,
        methodResponseTimeoutMs: 1
    })
    .then(() => console.log("Surprisingly, it worked."))
    .catch(console.error);

Targeting

If multiple apps are offering the same method, you can choose to invoke the method on the best application instance (default, if no target is passed), on a specific instance, on a set of instances, or on all instances (broadcast).

Targeting

Application instances are ranked internally, the "best" instance being the first one running on the user's desktop and under the user's name. If there are multiple applications matching these criteria, the first instance is used.

To invoke a method on a preferred set of applications, pass a target argument to its third optional parameter. A single object specifies a single application instance (filter), and an array specifies a set of applications.

To invoke a method on all applications offering the method, call glue.interop.invoke() with target all.

E.g., if you have a method Greet(string who), then:

  • If nothing is passed, best is default:
glue.interop.invoke("Greet", { who: "Devs" });

is the same as:

glue.interop.invoke("Greet", { who: "Devs" }, "best");
  • To target a specific application, you need to pass an instance (or an instance filter, as below):
glue.interop.invoke("Greet", { who: "Devs" }, { application: "MessageEmployees" });
  • To target a set of known applications, you can pass an array of instances:
const targets = glue.interop.servers().filter((server) =>
  server.application.startsWith("Message")
);
glue.interop.invoke("Greet", { who: "Devs" }, targets);
  • Finally, to broadcast to all applications offering this method, pass all as the target:
glue.interop.invoke("Greet", { who: "Devs" }, "all");

Multiple Results

Invoking a method on multiple Interop instances produces multiple results. Use the all_return_values property of the InvocationResult object to obtain an array of all inner InvocationResult objects. If any of the invocations succeeds, the success callback will be executed, otherwise - the error one.

glue.interop.invoke(
    "Sum",
    { a: 37, b: 5 },
    "all")
    .then(successResult => {
        successResult
        .all_return_values
        .forEach(value => {
            console.log(`Result: ${value.returned.answer}`);
        })
    })
    .catch(err => {
        console.error(`Failed to execute Sum ${err.message}`);
    });

Object Types

Overview

You can register an Interop method with one or more Object Type restrictions. This specifies that the method will handle requests for specific types of objects, such as an Instrument, an Order, etc.

This can be very useful in determining (at runtime) which methods are applicable to an object that the user is currently interacting with, e.g. the currently selected Instrument.

For Object Types to be used in a generic manner, all applications need to agree on the data format of the method signature and pass the selected object (and any additional information needed) to the offered method.

Registering Methods with Object Types

Method signature:

glue.interop.register({
    name: "SetParty",
    accepts: "composite: { string gId, string dynamicsId } party",
    objectTypes: ["Party"]
}, (args) => console.log(`gId = ${args.party.gId}, dynamicsId = ${args.party.dynamicsId}`)
);

Now let's find all methods working with parties:

const partyMethods = glue.interop.methods().filter((method) =>
    method.objectTypes && method.objectTypes.includes("Party")
);
console.table(partyMethods);

And here is how you can call all servers offering the method:

glue.interop.invoke("SetParty", {
    party: {
        gId: "01234567"
    }
}, "all") // target all Interop servers
    .then(() => { })
    .catch(console.error);

To register a method applicable to a particular type of object, set the objectTypes property to an array of strings in the method definition. A method typically supports a single object type, but there could be a case where the method supports more than one object type (e.g. an Instrument and an Order, where the method can extract the Instrument information from the Order).

Method Invocation with Targeting an Object Type

Invoking a method, registered with one or more object types, is no different than invoking any other method. In fact, you are not required to pass an objectType in the invocation options but doing so will help the implementor determine the "shape" of the object without any ambiguity.

The more interesting case is finding the methods applicable to an object type. The method glue.interop.methods() returns an array of all methods. To query a method with a specific object type simply means to apply a filter on the array:

const partyMethods = glue.interop.methods().filter((method) =>
    method.objectTypes && method.objectTypes.includes("Party")
);

Note, however, that once you find the methods applicable to an object type, any of these can be offered by multiple applications. So, if you plan on calling a method from all servers providing it, make sure you pass all for the target, or let the user choose via the UI which application should be called.

Discovery

Discovering Methods and Servers

Let's register the methods below and practice some of the above API calls:

glue.interop.register({
    name: "UP.OpenDigitalProfile",
    displayName: "Open User Provisioning Digital Profile",
    accepts: "composite: { string portf, string dynamics } party",
    objectTypes: ["Party"]
},
    (args) => console.log(`Opening Digital Profile for ${JSON.stringify(args.party, undefined, 2)}`)
);

glue.interop.register({
    name: "CC.OpenClientPerformance",
    displayName: "Open Client Performance",
    accepts: "composite: { string portf, string dynamics } party",
    objectTypes: ["Party"]
},
    (args) => console.log(`Opening Client Performance for ${JSON.stringify(args.party, undefined, 2)}`)
);

Listing methods on all servers:

console.table(glue.interop.methods());

Filtering by any of the properties of the method definition, e.g. name, accepts, returns:

console.log(JSON.stringify(glue.interop.methods({ name: "CC.OpenClientPerformance" }), undefined, 2));

Listing all servers offering Interop methods:

glue.interop.servers();

Querying which methods are offered by a given instance:

console.table(glue.interop.methodsForInstance({ application: "AppManager" }));

Listing the methods of your application:

console.table(glue.interop.methodsForInstance(glue.interop.instance));

Combining the above to get all servers and the methods they offer:

glue.interop.servers().forEach(
    (server) => {
        console.log(server.application);
        console.table(glue.interop.methodsForInstance(server));
    });

Querying which methods are applicable to a particular object type:

console.table(glue.interop.methods().filter((method) =>
    method.objectTypes && method.objectTypes.includes("Party")
));

Using the Interop Viewer, you can see which Interop application offers which methods and which method is offered by which applications. You can then invoke and subscribe to methods by one or more applications.

If you want to test the method invocations performed by your application, you can use the Interop Viewer to register dummy methods.

Some advanced scenarios might require your application to be able to discover applications and methods using code. For this purpose, the Interop library offers the following methods:

Function Description
methodAdded((method: MethodDefinition) => void) Called when a method is added for the first time by any application
methodRemoved((method: MethodDefinition) => void) Called when a method is removed from the last application offering it
serverAdded((server: InteropInstance) => void) Called when an application offering methods is discovered
serverRemoved((server: InteropInstance) => void) Called when an app offering methods stops offering them or exits
serverMethodAdded((method: MethodDefinition, server: InteropInstance) => void) Called when a method is offered by an application (for all apps)
serverMethodRemoved((method: MethodDefinition, server: InteropInstance) => void) Called when an app stops offering a method
servers() Finds all apps offering Interop methods (can be filtered)
methods() Finds all offered methods by all Interop apps (can be filtered)
methodsForInstance(server: InteropInstance) Finds all methods offered by a specific app instance

Streaming

Overview

Your application can publish events that can be observed by other applications, or it can provide real-time data (e.g., market data, news alerts, notifications, etc.) to other applications by publishing an Interop stream. Your application can also receive and react to these events and data by creating an Interop stream subscription.

Applications that create and publish to Interop streams are called publishers, and applications that subscribe to Interop Streams are called subscribers. An application can be both.

Streaming

Interop streams are used extensively in Glue42 Enterprise products and APIs:

  • to publish notifications about window status change (events);
  • to publish application configuration changes and notifications about application instance state changes (events);
  • in the Glue42 Notification Service (GNS) Desktop Manager and GNS Interop Servers - to publish Notifications (real-time data);
  • in the Glue42 Search Service (GSS) Desktop Manager and GSS Interop Servers - to publish results for a type-ahead query about an entity (events);
  • in the Window Management, Application Management, and Activities APIs (events);

See our JavaScript Streaming example on GitHub.

The snippets below are based on the Complete Streaming Example at the end of the Streaming section. The example illustrates an application which retrieves real-time data about a financial instrument (symbol) from a data source and provides the data as a stream to other applications. The client applications subscribe for the stream data by providing a symbol value as an argument in their subscription requests.

Publishing Stream Data

Creating Streams

To start publishing data, you need to create an Interop stream by calling glue.interop.createStream(). This registers an Interop method similar to the one created by glue.interop.register(), but with streaming semantics.

Here is the signature of the method:

function createStream(
    methodDefinition: String | MethodDefinition,
    options?: StreamOptions,
    successCallback?: (args?: object) => void,
    errorCallback?: (error?: string | object) => void
): Promise<Stream>

The MethodDefinition is identical to the Interop method definition for the glue.interop.register() method. If you pass a string to the definition, it will be used as a stream name:

const stream = await glue.interop.createStream("MarketData.LastTrades");

Which is identical to:

glue.interop.createStream({ name: "MarketData.LastTrades" });

The StreamOptions object allows you to pass several optional callbacks which let your application handle subscriptions in a more detailed manner:

  • to identify individual subscribers/clients;
  • to accept or reject subscriptions based on the subscription arguments;
  • to unicast data as soon as a client subscribes to the stream;
  • to group subscribers, which use the same subscription arguments, on a stream branch and then publish to that branch, which will multicast data to all these subscribers;

StreamOptions object example:

const streamOptions = {
    subscriptionRequestHandler: (subscriptionRequest) => {},
    subscriptionAddedHandler: (streamSubscription) => {},
    subscriptionRemovedHandler: (streamSubscription) => {}
};

And here is an example of creating a stream:

// Stream definition.
const streamDefinition = {
    name: "MarketData.LastTrades",
    displayName: "Market Data - Last Trades",
    accepts: "String symbol",
    returns: "String symbol, Double lastTradePrice"
};

// Stream options object containing subscription request handlers.
const streamOptions = {
    subscriptionRequestHandler: (subscriptionRequest) => {},
    subscriptionAddedHandler: (streamSubscription) => {},
    subscriptionRemovedHandler: (streamSubscription) => {}
};

// Creating the stream.
let stream;

async function initiateStream() {
    stream = await glue.interop.createStream(streamDefinition, streamOptions);
    console.log(`Stream "${stream.definition.displayName}" created successfully.`);
};

initiateStream().catch(console.error);

Accepting or Rejecting Subscriptions

Subscriptions are auto accepted by default. You can control this behavior by passing a subscriptionRequestHandler in the StreamOptions object. Note that this handler is called before the subscriptionAddedHandler, so if you reject the request, the subscriptionAddedHandler will not be called.

The subscriptionRequest object has the following properties:

  • instance - the Interop instance of the subscriber application, e.g. { application: "Portfolio", ... };
  • arguments - an object containing the subscription arguments, e.g. { symbol: "GOOG" };
  • accept() - accepts the instance subscription;
  • acceptOnBranch() - accepts the subscription on a branch with the provided string argument as a name (e.g., subscription.acceptOnBranch("GOOG")). Pushing data to that branch will multicast it to all subscriptions associated with the branch;
  • reject() - rejects the subscription and returns the provided string argument as a reason for the rejection (e.g., subscription.reject("Subscription rejected: invalid arguments."));

Example of a subscriptionRequestHandler:

function onSubscriptionRequest(subscriptionRequest) {

    // Here you can identify, accept or reject subscribers,
    // group subscribers on a shared stream branch, access the subscription arguments.

    const application = subscriptionRequest.instance.application;
    const symbol = subscriptionRequest.arguments.symbol;

    // If the subscription request contains a `symbol` property in the its `arguments` object,
    // accept it on a stream branch with the provided symbol as a branch key,
    // otherwise, reject the subscription.
    if (symbol) {
        subscriptionRequest.acceptOnBranch(symbol);
        console.log(`Accepted subscription by "${application}" on branch "${symbol}".`);
    } else {
        subscriptionRequest.reject("Subscription rejected: missing `symbol` argument.");
        console.warn(`Rejected subscription by "${application}". Symbol not specified.`);
    };
};

Added and Removed Subscriptions

By default, nothing happens when a new subscription is added or removed. You may, however, want to push data to the subscriber, if the data is available, or unsubscribe from the underlying data source, when the last subscriber for that data is removed. Use the subscriptionAddedHandler and the subscriptionRemovedHandler in the StreamOptions object to achieve this.

Handling New Subscriptions

Example of a subscriptionAddedHandler:

const symbolPriceCache = {
    "GOOG": {
        price: 123.456
    }
}

function onSubscriptionAdded(streamSubscription) {

    const symbol = streamSubscription.arguments.symbol;
    const isFirstSubscription = symbolPriceCache[symbol] ? false : true;

    if (isFirstSubscription) {
        // If this is a first subsription for that symbol,
        // start requesting data for it and cache it.
        symbolPriceCache[symbol] = {};
        startDataRequests(symbol);
        console.log(`First subscription for symbol "${symbol}" created.`);
    } else {
        // If there is already an existing subscription for that symbol,
        // send a snapshot of the available price to the new subscriber.
        const price = symbolPriceCache[symbol].price;

        // Check first whether a price is available.
        if (price) {
            const data = { symbol, price };

            // Unicast data directly to this subscriber.
            streamSubscription.push(data);
            console.log(`Sent snapshot price for symbol "${symbol}".`);
        };
    };
};

function startDataRequests(symbol) {
    // Here you can make requests to a real-time data source.
}

Handling Last Subscription Removal

Example of a subscriptionRemovedHandler:

function onSubscriptionRemoved(streamSubscription) {

    const symbol = streamSubscription.arguments.symbol;
    const branch = streamSubscription.stream.branch(symbol);

    // If there are no more subscriptions for that symbol,
    // stop requesting data and remove the symbol from the cache.
    if (branch === undefined) {
        stopDataRequests(symbol);
        delete symbolPriceCache[symbol];
        console.warn(`Branch was closed, no more active subscriptions for symbol "${symbol}".`);
    };
};

function stopDataRequests(symbol) {
    // Terminate the requests to the data source.
};

Using Stream Branches

If your stream publishing code uses branches (e.g., creates a branch for each unique set of subscription arguments and associates the subscriptions with that branch), whenever a data arrives from your underlying source, you can use the branch to publish the data to the subscribers on that branch instead of manually going over all subscriptions and pushing data to the interested clients.

Example:

// Extract the data returned in the response from the data source, e.g.:
// const symbol = responseData.symbol;
// const price = responseData.price;
const data = { symbol, price };

// The subscriptions have been accepted on branches with the `symbol`
// provided in the subscription requests as a branch key,
// so now the same `symbol` is used to identify the branch to which to push data.
stream.push(data, symbol);

Server Side Subscription Object

The StreamSubscription object has the following properties:

  • arguments - the arguments used by the client application to subscribe;
  • stream - the stream object you have registered, so you don't need to keep track of it;
  • branchKey - the key of the branch (if any) with which the stream publisher has associated the client subscription;
  • instance - the instance of the subscriber;
  • push() - a method to push data directly to a subscription (unicast);
  • close() - method which closes the subscription forcefully on the publisher side, e.g. if the publisher shuts down;

Stream Object

The Stream object has the following properties:

  • definition - the definition object with which the stream was created;
  • name - the name of the stream as passed in the definition object;
  • subscriptions() - returns a list of all subscriptions;
  • branches() - returns a list of all branches;
  • close() - closes the stream and unregisters the corresponding Interop method;

Branch Object

The StreamBranch object has the following properties:

  • key - the key with which the branch was created.
  • subscriptions() - returns all subscriptions which are associated with this branch;
  • close() - closes the branch (and drops all subscriptions on it);
  • push() - multicasts data to all subscriptions on the branch. This is always more efficient than keeping track of the subscriptions manually and doing it yourself;

Consuming Stream Data

Subscribing to a Stream

Streams are simply special Interop methods, so subscribing to a stream resembles very much invoking a method. To subscribe, you need to create a subscription by calling glue.interop.subscribe().

Method signature:

function subscribe(
    methodDefinition: string | MethodDefinition, 
    parameters?: SubscriptionParams {
        arguments?: object,
        target?: InstanceTarget,
        waitTimeoutMs?: number,
        methodResponseTimeout?: number,
        onData?: (data: StreamData) => void,
        onClosed?: () => void,
        onConnected?: (server: Instance, reconnect: boolean) => void
    }
): Promise<Subscription>
  • methodDefinition - Required. Accepts a string (method name) or a MethodDefinition object;
  • parameters - an optional SubscriptionParams object:
    • arguments - object containing arguments to be used to subscribe to the stream. Passing arguments enables you to group subscribers that use the same arguments on a stream branch (see Publishing Stream Data), and/or use these as a filter on the side of the publisher;
    • target- an InstanceTarget object that can be one of "best", "all", "skipMine", Instance or Instance[] (see Invoking Methods);
    • waitTimeoutMs - timeout to discover the stream, if not immediately available;
    • methodResponseTimeout - timeout to wait for the stream reply;
    • onData - callback to handle the event when new data is received;
    • onClosed - callback to handle the event when the subscription is closed by the server;
    • onConnected - callback to handle the event when the subscription is connected to a server;

And here is a basic example of how to create a subscription:

const subscriptionOptions = {
    arguments: { symbol: "GOOG" }
};

// Creating the subscription.
let subscription;

async function createSubscription() {
    subscription = await glue.interop.subscribe("MarketData.LastTrades", subscriptionOptions);
};

createSubscription().catch(console.error);

// Use subscription here.

Handling Subscriptions Client Side

The client side Subscription object has several useful properties providing information about the subscription instance, as well as methods for handling subscription events and stream data.

Subscription Information

  • requestArguments - arguments used to make the subscription;
  • serverInstance - instance of the application providing the stream;
  • stream - the stream definition object;

Receiving Stream Data

Once you have a subscription, you can use its onData() method to handle stream data. The callback you register with the onData() method of the Subscription object will fire every time new stream data is received:

subscription.onData((streamData) => {
    // Use stream data here.
});

The streamData argument is an object which has the following properties:

  • requestArguments - the subscription request arguments;
  • data - the data object published by the stream publisher;
  • private - a flag indicating whether the data was unicast to this subscription (false, if multicast from a stream or a stream branch);
  • server - the Interop instance which pushed the data;
  • message - message from the publisher of the stream;

Closed or Rejected Subscriptions

A stream subscription can be closed at any time due to the publisher shutting down, or due to an error. Two methods handle these events:

subscription.onClosed(() => {
    // Closed gracefully by the publisher.
});

and:

subscription.onFailed((error) => {
    // Unexpected error in the publisher.
});

Stream Discovery

Streams are special Interop methods, so you can use the Interop Discovery API to find available streams. The only difference is that streaming methods are flagged with a property supportsStreaming(true).

Finding all streams:

const streams = glue.interop.methods().filter(method => method.supportsStreaming === true);

Finding a known stream:

const stream = glue.interop.methods().find(method => method.name === "MarketData.LastTrades");

Complete Streaming Example

Below is a complete streaming example containing server side and client side code. The server publishes a stream that simulates fetching real time market data for financial instruments from a data source and sending it to subscribers by using stream branches. A new branch is created for each unique instrument symbol (the branch key is the symbol itself). The client subscribes for this data by providing the symbol of the financial instrument in the subscription request. The server defines callbacks for handling the subscription, and the client defines callbacks for handling new data and the event which fires when the server closes the subscription. Logging is provided both on the server and on the client side to allow you to follow the streaming events more easily.

To test the example:

  1. Open the console of a Glue42 enabled application and paste the Stream Publisher example in it.

  2. Open the console of a different Glue42 enabled application and paste the Stream Consumer example in it.

  3. To simulate multiple subscriptions, repeat step 2 with several other Glue42 enabled applications. Change the value of the symbol property (use any random string) in the arguments object of the subscription options to simulate subscriptions for different financial instruments. (Don't forget to change the names of the variables that have already been declared if you want to make more than one subscription from the same client.)

  4. Experiment with the stream, stream.branches() and subscription objects in the console to trigger the event handlers. For more information on what you can do, explore the Interop API reference documentation.

Stream Publisher

// Cache object that will contain all symbols and symbol prices 
// for which there are active subscriptions.
const symbolPriceCache = {};

// Variable that will hold the stream object.
let stream;

/** SUBSCRIPTION HANDLERS **/

function onSubscriptionRequest(subscriptionRequest) {

    const application = subscriptionRequest.instance.application;
    const symbol = subscriptionRequest.arguments.symbol;

    // If the subscription request contains a `symbol` property in the its `arguments` object,
    // accept it on a stream branch with the provided symbol as a branch key,
    // otherwise, reject the subscription.
    if (symbol) {
        subscriptionRequest.acceptOnBranch(symbol);
        console.log(`Accepted subscription by "${application}" on branch "${symbol}".`);
    } else {
        subscriptionRequest.reject("Subscription rejected: missing `symbol` argument.");
        console.warn(`Rejected subscription by "${application}". Symbol not specified.`);
    };
};

function onSubscriptionAdded(streamSubscription) {

    const symbol = streamSubscription.arguments.symbol;
    const isFirstSubscription = symbolPriceCache[symbol] ? false : true;

    if (isFirstSubscription) {
        // If this is a first subsription for that symbol,
        // start requesting data for it and cache it.
        symbolPriceCache[symbol] = {};
        startDataRequests(symbol);
        console.log(`First subscription for symbol "${symbol}" created.`);
    } else {
        // If there is already an existing subscription for that symbol,
        // send a snapshot of the available price to the new subscriber.
        const price = symbolPriceCache[symbol].price;

        // First check first whether a price is available.
        if (price) {
            const data = { symbol, price };
            streamSubscription.push(data);
            console.log(`Sent snapshot price for symbol "${symbol}".`);
        };
    };
};

function onSubscriptionRemoved(streamSubscription) {

    const symbol = streamSubscription.arguments.symbol;
    const branch = streamSubscription.stream.branch(symbol);

    // If there are no more subscriptions for that symbol,
    // stop requesting data and remove the symbol from the cache.
    if (branch === undefined) {
        stopDataRequests(symbol);
        delete symbolPriceCache[symbol];
        console.warn(`Branch was closed, no more active subscriptions for symbol "${symbol}".`);
    };
};

/** PUBLISHING THE STREAM **/

// Stream definition.
const streamDefinition = {
    name: "MarketData.LastTrades",
    displayName: "Market Data - Last Trades",
    accepts: "String symbol",
    returns: "String symbol, Double lastTradePrice"
};

// Stream options object containing subscription request handlers.
const streamOptions = {
    subscriptionRequestHandler: onSubscriptionRequest,
    subscriptionAddedHandler: onSubscriptionAdded,
    subscriptionRemovedHandler: onSubscriptionRemoved
};

// Creating the stream.
async function initiateStream() {
    stream = await glue.interop.createStream(streamDefinition, streamOptions);
    console.log(`Stream "${stream.definition.displayName}" created successfully.`);
};

initiateStream().catch(console.error);

/** HELPER FUNCTIONS **/

function startDataRequests(symbol) {
    // Set up a task to send requests to a data source every 5 seconds.
    symbolPriceCache[symbol].pollingTask = setInterval(fetchMarketData, 5000, symbol);
};

function stopDataRequests(symbol) {

    const pollingTask = symbolPriceCache[symbol].pollingTask;

    // Stop the requests to the data source.
    clearInterval(pollingTask); 
};

function fetchMarketData(symbol) {

    // Here is the place to create actual requests to a data source
    // and push the received data to the subscribers on the respective branch.
    const price = Math.random() * 1000;
    const data = { symbol, price };

    // Push the `data` to all subscribers on the branch with key `symbol`.
    stream.push(data, symbol);

    // Cache the price for the symbol in order to use it
    // as a snapshot for a subsequent subscription.
    symbolPriceCache[symbol].price = price;
};

Stream Consumer

// Subscription options object containing subscription arguments
// and callbacks to handle new data and closing the subscription.
const subscriptionOptions = {
    arguments: { symbol: "GOOG" },
    onData: (streamData) => console.log(streamData.data.symbol, streamData.data.price),
    onClosed: () => console.warn("Subscription closed by server.")
};

// Creating the subscription.
let subscription;

async function createSubscription() {
    subscription = await glue.interop.subscribe("MarketData.LastTrades", subscriptionOptions);
};

createSubscription().catch(console.error);

Reference

Reference