On this page

BangDBStreamManager

BangDB Stream Manager Type

Client API for building apps

BangDB Stream Manager provides ways to create necessary constructs to deal with streams in timeseries manner.
It allows users to create stream, ingest data, define processing logic so that continuous ingestion and analysis can go on in automated manner.

C++

Java

To get the instance of BangDB Stream Manager, call the constructor. It takes BangDB Env object reference, see BangDBEnv for more details

BangDB Stream Manager (BangDBEnv *env); 
BangDB Stream works on a schema which user must define and register with the Stream manager in order to be able to receive data in the stream and also process the events as defined in the schema.

See BangDB Stream for more information

To register a schema.
 char *registerSchema(const char *schema_json); 
The schema/ app is in JSON format and contains details of stream operation.
It returns NULL for serious error or json doc with errcode less than 0 with information on why the registration failed.
If successful then errcode is set to 0 in the returned doc
Users should call delete[] to free the memory

To de-register / delete a existing schema
 char *deregisterSchema(const char *schema_name, bool cleanclose = true); 
schema_name is the name given to the schema by the user.
If successful then errcode is set to 0 in the returned doc
Else for error it could return NULL or errcode set to -1
Users should call delete[] to free the memory
To add streams to an existing schema.
 char *addStreams(long schemaid, const char *streams); 
The streams input here is a json string that contains an array of streams to be added.it takes schemaid as input for which the set of streams to be added.
schemaid is a unique id associated with a particular schema.
If successful then errcode is set to 0 in the returned doc
Else for error it could return NULL or errcode set to -1
Users should call delete[] to free the memory
To delete streams from an existing schema
 char *deleteStreams(long schemaid, const char *streams); 
If successful then errcode is set to 0 in the returned doc
Else for error it could return NULL or errcode set to -1
Users should call delete[] to free the memory
To set stream state
 char *setStreamState(const char *schema, const char *stream, short st); 
If successful then errcode is set to 0 in the returned doc
Else for error it could return NULL or errcode set to -1
Users should call delete[] to free the memory
To get stream state
 int getStreamState(const char *schema, const char *stream); 
The state of stream could be ON or OFF, hence it returns 1 or 0 respectively. For error it returns -1
To add user defined functions for computing in the schemas
 char *addUdfs(long schema_id, const char *udfs); 
If successful then errcode is set to 0 in the returned doc
Else for error it could return NULL or errcode set to -1
Users should call delete[] to free the memory
To delete udf from a given schema using udf name and schema id
 char *delUdfs(long schema_id, const char *udfs); 
If successful then errcode is set to 0 in the returned doc
Else for error it could return NULL or errcode set to -1
Users should call delete[] to free the memory
This list all the user defined functions present in the Database
 char *getUdfList(); 
If successful then it returns the list else NULL
Users should call delete[] to free the memory
To get the list all Registered notifications. These are notification templates to send the event notifications. These are not actual notifications.
Please see Notification to know more about notification template and also dealing with it

Note that this is very similar to scanDoc present in the table. Infact it’s exactly same as that. It takes the query filter as one of the arguments (idx_filter_json).

To see more information on how to scan the table, see DataQuery section
ResultSet * scanRegisteredNotif(
  ResultSet * prev_rs,
  FDT * pk_skey = NULL,
  FDT * pk_ekey = NULL,
  const char * idx_filter_json = NULL,
  ScanFilter * sf = NULL
);
If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error
To get the list of generated notifications, users may scan in the usual way. Note that the query filter can still be used for the scan. See DataQuery to know more about scan
ResultSet * scanNotification(
   ResultSet * prev_rs,
   FDT * pk_skey = NULL,
   FDT * pk_ekey = NULL,
   const char * idx_filter_json = NULL,
   ScanFilter * sf = NULL
);
If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error
To insert events into the stream. The event is the doc (json document)
 char *put(long schemaid, long streamid, const char *doc); 
streamid is a unique numerical id associated with a particular stream.
It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well
User should delete the memory of returned data by calling delete[]
To get the events from any given stream from a given schema
 char *put(long schemaid, long streamid, long k, const char *v); 
It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well
User should delete the memory of returned data by calling delete[]
To scan the stream for a given filter condition. Users may scan the stream in the usual way. Note that the query filter (idx_filter_json) can still be used for the scan. See DataQuery to know more about scan
ResultSet * scanDoc(
   long schemaid,
   long streamid,
   ResultSet * prev_rs,
   FDT * pk_skey = NULL,
   FDT * pk_ekey = NULL,
   const char * idx_filter_json = NULL,
   ScanFilter * sf = NULL
);
To scan aggregate, groupby and entity streams users should call this API. This takes a special argument attr_names_json, which defines what kind of data is being scanned. This again is a recursive scan and is used similar to other scans (db) . See DataQuery to know more about how to use scan effectively
ResultSet * scanProcDoc(
   long schemaid,
   long streamid,
   const char * attr_names_json,
   ResultSet * prev_rs,
   ScanFilter * sf = NULL
);
The attr_names_json defines what to do and for whom this is being called. The structure of the json is
-for aggr = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...], 
"option" : 1, "skey:"sk",
"ekey":"ek", 
"rollup":1
}

-for entity = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...],
"option" : 1, 
"skey:"sk", 
"ekey":"ek", 
"enty-stream-id":1234
}

-for gpby = query_json = {
"attrs":["a", "b", ...], 
"option" : 1, 
"skey:"sk", 
"ekey":"ek", 
"gpby-val":"x", 
"gpby-name":"a1_b2_c3", 
"gpby-attrid":123
} 
It returns ResultSet, which could be iterated to go over the events. Else it returns NULL for error.
To count the number of events for a given filter condition
long countProc(long schemaid, long streamid, const char *attr_names_json, ScanFilter *sf = NULL); 
It returns -1 for error
To get groupby operation name. Check out more about it in Stream
long getGpbyName(long schemaid, long streamid, const char *gpby_attr_list, char **out_json); 
To get the GpbyName is the mangled name given by the stream manager to a particular groupby operation. The gpby_tatr_list provides the necessary information for the computation of the name.
 gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123} 
It returns -1 for error else 0 for success. The out_json contains the name of the groupby
To count total number of events present in the given stream
 long count(long schemaid, long streamid); 
It returns -1 for error else the count
To get number of events present for a given condition or filter query (idx_filter_json)
long count(long schemaid, long streamid, FDT *pk_skey, FDT *pk_ekey, const char *idx_filter_json = NULL, ScanFilter *sf = NULL); 
It returns -1 for error else the count
To get count of event pushed into the raw streams
ResultSet * scanUsage(
   ResultSet * prev_rs,
   long fromts,
   long tots,
   int rollup,
   ScanFilter * sf = NULL
);
It returns Resultset for success or NULL for error
To get the schema id for an existing schema
 long getSchemaid(const char *schema_name, bool check_valid = true); 
It returns -1 for error else the schemaid
To get the stream id for a stream in an existing schema
 long getStreamid(const char *schema_name, const char *stream_name, bool check_valid = true); 
It returns -1 for error or streamid
To get the entire schema (json structure). This API returns from the Stream memory and not from the stored metadata.
 char *getSchemaStr(const char *schema_name); 
It returns NULL for error or json with errocde non-zero for other errors.
For success it returns the schema.
User should delete the memory of returned data by calling delete[]
To get the entire schema (json structure) from metadata. Usually both this and previous schema would be the same, but in some cases they could be different.
 char *getSchemaFromMetadata(const char *schema_name); 
User should delete the memory of returned data by calling delete[]
To get a dependency graph for a given schema, users may call this API. This returns json doc defining the entire dependency graph for the scehma
 char *getSchemaDepGraph(long schema_id, bool bfs = true); 
The schema is structured as a graph within the stream manager. This api will return the graph for the given schema.
It returns NULL for error
User should delete the memory of returned data by calling delete[]
To get dependency graph for a given stream
 char *getStreamDepGraph(long schema_id, long stream_id, bool only_dep = false); 
This api will return the graph for the given stream for a schema.Please see stream section to know more on the graph
It returns NULL for error
User should delete the memory of returned data by calling delete[]
To get list of all schemas present in the database
 char *getSchemaList(); 
This returns json doc with the list of all the schema or NULL for error. It may set errcode as -1 as well for some errors
User should delete the memory of returned data by calling delete[]
To close the stream manager in the end
 void closeBangDB Stream Manager (CloseType ctype = DEFAULT_AT_CLIENT); 
ClosedType is enum with following values;

DEFAULT_AT_CLIENT,
CONSERVATIVE_AT_SERVER,
OPTIMISTIC_AT_SERVER,
CLEANCLOSE_AT_SERVER,
SIMPLECLOSE_AT_SERVER,
DEFAULT_AT_SERVER;

Please see more on this at bangdb common

 

Was this article helpful to you? Yes No