Developing with REST API
  • Updated on 16 Oct 2019
  • 5 minutes to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

Developing with REST API

  • Print
  • Share
  • Dark
    Light

Overview

Globally distributed applications need a geo distributed fast data platform that can transparently replicate the data anywhere in the world to enable the applications to operate on a copy of the data that's close to its users. Similarly the applications need geo-replicated and local streams to handle pub-sub, ETL and real-time updates from the fast data platform.

C8 is a fully managed geo-distributed fast data service with turnkey global distribution and transparent multi-master replication. You can run globally distributed, low-latency workloads with C8. This article is an introduction to using C8 via its REST APIs.

comment:
Using C8 is quite simple. Many activites that take large amount of time (days) and efforts (days, months,...) can be done in C8 quite quickly. For example, you can create a geo-distributed database spanning across the globe in a second with a single rest api call. Same for creating geo-replicated streams as well as other activites. Good to wear your skepticsm hat and prove to yourself.

# author: Pratik Narode

Pre-Requiste

A tenant account (and credentials) with Macrometa Fast Data service.

Tutorial Steps

Constants

Let's define following constants which will be used throughout the tutorial.

FEDERATION_URL = "https://gdn1.macrometa.io"

TENANT_NAME = "tenant1"
TENANT_USER = "root"
TENANT_PWD = "tenant-pwd"

GUEST_USER = "guest-user"
GUEST_MAIL = "guest@example.com"
GUEST_PWD = "guest-user-pwd"
GEO_FABRIC =  "geofabric1"

COLLECTION_NAME = "testcollection"
SUBSCRIPTION_NAME = "my-sub"

Create a https session

session = requests.Session()
session.auth = (TENANT_NAME, TENANT_USER, TENANT_PWD)

Get list of all regions where C8 is available

url = FEDERATION_URL + "/_database/_api/datacenter/all"
dcl_resp = session.get(url)
dcl_list = json.loads(dcl_resp.text)
regions = []
for dcl in dcl_list:
    dcl_url = dcl['tags']['url']
    regions.append(dcl_url)

Create Document Collection

Create a test collection. Collection type = 2 for documents. Collection type = 3 for edges.

session = requests.Session()
session.auth = (GUEST_USER, GUEST_PWD)
auth = session.post(FEDERATION_URL)
url = FEDERATION_URL + "/_database/_tenant/{}/_db/{}/_api/collection".format(TENANT_NAME, GEO_FABRIC)
col = session.post(url, json={
    "name": COLLECTION_NAME,
    "type": 2
})

Create, Read, Update & Delete documents using C8QL

session = requests.Session()
session.auth = (GUEST_USER, GUEST_PWD)
auth = session.post(FABRIC_URL)
url = FEDERATION_URL + "/_database/_tenant/{}/_db/{}/_api/cursor".format(TENANT_NAME, GEO_FABRIC)

# Insert documents to the collection
col = session.post(url, json={
    "query": "INSERT{'name' : 'Julie', 'company' : 'ABC', '_key' : 'Julie'}" \
            "INTO testCollection"" 
})

# Read from the collection
col = session.post(url, json={
    "query": "FOR doc IN testCollection RETURN doc" 
})

# Update documents in the collection
col = session.post(url, json={
    "query": "FOR c IN testcollection UPDATE {'company':'XYZ'} IN testcollection"
})

# Delete documents in the collection
col = session.post(url, json={
    "query": "FOR c IN testcollection REMOVE c IN testcollection"
})

Create Geo-Replicated Stream

Create a persistent geo-replicated stream under the geo-fabric.

url = FEDARATION_URL + "/_tenant/{}/_db/{}/streams/persistent/stream/{}" \.format(TENANT_NAME, GEO_FABRIC, "stream1")
user_session = requests.Session()
user_session.auth = (GUEST_USER, GUEST_PWD)
stream = user_session.post(url)

Publish & Subscribe messages to the stream

Publish messages to the above created stream.

stream_url = 'wss://' + url + '/c8/_ws/ws/v2/producer/' + "persistent" + '/' + TENANT_NAME + '/' + FEDERATION_NAME \+ '.' + GEOFABRIC_NAME + '/' + "stream1"
ws = websocket.create_connection(stream_url)
ws.send(json.dumps({
    'payload': base64.b64encode('Hello World'),
    'properties': {
        'key1': 'value1',
        'key2': 'value2'
    },
    'context': 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
    print('Message published successfully')
else:
    print('Failed to publish message:', response)
ws.close()

Subscribe to stream and print the messages.

stream_url = 'wss://' + url + '/c8/_ws/ws/v2/producer/' + "persistent" + '/' + TENANT_NAME + '/' + FEDERATION_NAME \ + '.' + GEOFABRIC_NAME + '/' + "stream1" + SUBSCRIPTION_NAME

ws = websocket.create_connection(stream_url)
while True:
    msg = json.loads(ws.recv())
    if not msg: break

    print("received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
    # Acknowledge successful processing
    ws.send(json.dumps({'messageId': msg['messageId']}))

ws.close()

Recap

Your best friend when working with REST APIs is the REST API browser available in C8 GUI. From there, you can execute various rest apis and to exactly what the inputs and outputs are.

c8_rest_api_browser

Below is the complete program if you want to play around with it.

import base64
import json
import requests
import websocket

# Constants defined which are used throughout the tutorial.
FEDERATION_URL = "https://gdn1.macrometa.io"

TENANT_NAME = "tenant1"
TENANT_USER = "root"
TENANT_PWD = "tenant-pwd"

GUEST_USER = "guest-user"
GUEST_PWD = "guest-user-pwd"
GEO_FABRIC = `geofabric1`

COLLECTION_NAME = "testCollection"
SUBSCRIPTION_NAME = "my-sub"

# Create a https session
session = requests.Session()
session.auth = (TENANT_NAME, TENANT_USER, TENANT_PWD)

# Get list of regions where C8 federation is running
url = FEDERATION_URL + "/_database/_api/datacenter/all"
dcl_resp = session.get(url)
dcl_list = json.loads(dcl_resp.text)
regions = []
for dcl in dcl_list:
    dcl_url = dcl['tags']['url']
    regions.append(dcl_url)

# Create guest user by passing in username and password
session = requests.Session()
session.auth = (TENANT_NAME, TENANT_USER, TENANT_PWD)
url = FEDERATION_URL + "/_database/_tenant/{}/_db/_system/_admin/user".format(TENANT_NAME)
user = session.post(url=url, json={
    "active": True,
    "extra": {},
    "passwd": GUEST_PWD,
    "user": GUEST_USER
})

# Create a geo fabric and specify regions to be replicated.
# Also assign permissions on the database to the guest user
session = requests.Session()
session.auth = (TENANT_USER, TENANT_PWD)
auth = session.post(FEDERATION_URL)

url = FEDERATION_URL + "/_database/_tenant/{}/_db/_system/_api/database".format(TENANT_NAME)
db = session.post(url, json={
    "name": GEO_FABRIC,
    "options": {
        "dcList": regions,
        "realTime": True
    },
    "users": [
        {
            "active": True,
            "extra": {},
            "passwd": GUEST_PWD,
            "username": GUEST_USER
        }
    ]
})

# Create a test collection. collection type - 2 for document, 3 for edge
session = requests.Session()
session.auth = (GUEST_USER, GUEST_PWD)
auth = session.post(FEDERATION_URL)
url = FABRIC_URL + "/_database/_tenant/{}/_db/{}/_api/collection".format(TENANT_NAME, GEO_FABRIC)
col = session.post(url, json={
    "name": COLLECTION_NAME,
    "type": 2
})

# Using C8QL do CRUD operations on the collection.
session = requests.Session()
session.auth = (GUEST_USER, GUEST_PWD)
auth = session.post(FABRIC_URL)
url = FABRIC_URL + "/_database/_tenant/{}/_db/{}/_api/cursor".format(TENANT_NAME, GEO_FABRIC)

# Read from the collection
col = session.post(url, json={
    "query": "FOR c IN testCollection RETURN c"  # Read from database
})

# Deleting from collection
col = session.post(url, json={
    "query": "FOR c IN testCollection REMOVE c IN testCollection"
})

# Create persistent global stream
url = FABRIC_URL + "/_tenant/{}/_db/{}/streams/persistent/stream/{}" \.format(TENANT_NAME, DB_NAME, "stream1")
user_session = requests.Session()
user_session.auth = (GUEST_USER, GUEST_PWD)
persistent_stream = user_session.post(url)

# Publish messages to a stream
stream_url = 'wss://' + url + '/c8/_ws/ws/v2/producer/' + "persistent" + '/' + TENANT_NAME + '/' + FABRIC_NAME \+ '.' + GEO_FABRIC + '/' + "stream1"
ws = websocket.create_connection(stream_url)
ws.send(json.dumps({
    'payload': base64.b64encode('Hello World'),
    'properties': {
        'key1': 'value1',
        'key2': 'value2'
    },
    'context': 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
    print('Message published successfully')
else:
    print('Failed to publish message:', response)
ws.close()

# Subscribe to a stream
stream_url = 'wss://' + url + '/c8/_ws/ws/v2/consumer/' + "persistent" + '/' + TENANT_NAME + '/' + FABRIC_NAME \ + '.' + DB_NAME + '/' + "stream1" + SUBSCRIPTION_NAME
ws = websocket.create_connection(stream_url)
while True:
    msg = json.loads(ws.recv())
    if not msg: break
    print("received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
    # Acknowledge successful processing
    ws.send(json.dumps({'messageId': msg['messageId']}))

ws.close()
Was this article helpful?