Getting Started
  • Updated on 25 Jun 2019
  • 4 minutes to read
  • Contributors
  • Print
  • Share
  • Dark
    Light

Getting Started

  • Print
  • Share
  • Dark
    Light

pyC8_logo

The python driver documentation is also available at pyc8.readthedocs.io

Features

  • Clean Pythonic interface
  • Lightweight

Compatibility

pyC8 requires Python 3.5+, and Python 3.6 or higher is recommended

Installation

To install a stable version from PyPi_:

~$ pip install pyC8

You may need to use sudo depending on your environment.

or, if you prefer to use conda:

conda install -c conda-forge pyC8

or pipenv:

pipenv install --pre pyC8

Once the installation process is finished, you can begin developing C8 fast data applications in Python.

Getting Started

Here is an example showing how pyC8 client can be used:

   from c8 import C8Client
   import time
   import warnings
   warnings.filterwarnings("ignore")

   region = "try.macrometa.io"
   demo_tenant = "demo"
   demo_fabric = "_system"
   demo_user = "demo"
   demo_collection = "employees"
   demo_stream = "demostream"

   client = C8Client(protocol='https', host=region, port=443)
   
   print("Create and populate employees collection in demofabric...")
   fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxxx')
   
   employees = fabric.create_collection('employees') # Create a new collection named "employees".
   employees.add_hash_index(fields=['email'], unique=True) # Add a hash index to the collection.

   employees.insert({'firstname': 'Jean', 'lastname':'Picard', 'email':'jean.picard@macrometa.io'})
   employees.insert({'firstname': 'James', 'lastname':'Kirk', 'email':'james.kirk@macrometa.io'})
   employees.insert({'firstname': 'Han', 'lastname':'Solo', 'email':'han.solo@macrometa.io'})
   employees.insert({'firstname': 'Bruce', 'lastname':'Wayne', 'email':'bruce.wayne@macrometa.io'})

   #------------------------------
   print("query employees collection...")
   
   cursor = fabric.c8ql.execute('FOR employee IN employees RETURN employee') # Execute a C8QL query
   docs = [document for document in cursor]
  
   print(docs)

   #------------------------------
   print("Create global & local streams in demofabric...")
   fabric.create_stream(demo_stream, local=False)
   fabric.create_stream(demo_stream, local=True)

   streams = fabric.streams()
   print("streams:", streams)

Example to query a given fabric

  from c8 import C8Client
  import json
  import warnings
  warnings.filterwarnings("ignore")

  region = "try.macrometa.io"
  demo_tenant = "demo"
  demo_fabric = "_system"
  demo_user = "demo"
   
  #----------------------------
  print("query employees collection...")

  client = C8Client(protocol='https', host=region, port=443)
  fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxx')
 
 #get fabric details
  fabric.fabrics_detail()
  cursor = fabric.c8ql.execute('FOR employee IN employees RETURN employee') # Execute a C8QL query
  docs = [document for document in cursor]
  print(docs)

Example for real-time updates from a collection in fabric

  from c8 import C8Client
  import warnings
  warnings.filterwarnings("ignore")

  region = "try.macrometa.io"
  demo_tenant = "demo"
  demo_fabric = "_system"
  demo_user = "demo"
  
  def callback_fn(event):
      print(event)

  #------------------------------------
  print("Subscribe to employees collection...")
  
  client = C8Client(protocol='https', host=region, port=443)
  fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxx')
  fabric.on_change("employees", callback=callback_fn)
  

Example to publish documents to a stream


  from c8 import C8Client
  import time
  import warnings
  warnings.filterwarnings("ignore")

   region = "try.macrometa.io"
  demo_tenant = "demo"
  demo_fabric = "_system"
  demo_user = "demo"

  #--------------------------------------------------------------
  print("publish messages to stream...")
  client = C8Client(protocol='https', host=region, port=443)
  fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxx')
  stream = fabric.stream()
  producer = stream.create_producer("demostream", local=False)
  for i in range(10):
      msg = "Hello from " + region + "("+ str(i) +")"
      producer.send(msg.encode('utf-8'))
      time.sleep(10) #sec
      

Example to subscribe documents from a stream

   from c8 import C8Client
   import warnings
   warnings.filterwarnings("ignore")

   region = "try.macrometa.io"
  demo_tenant = "demo"
  demo_fabric = "_system"
  demo_user = "demo"

   #-------------------------------------
  print("consume messages from stream...")
   
  client = C8Client(protocol='https', host=region, port=443)
  fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxx')
   
  stream_collection = fabric.stream()
  subscriber = stream_collection.subscribe("demostream",local=False, subscription_name="demosub", consumer_type= stream_collection.CONSUMER_TYPES.EXCLUSIVE)
  
   # you can subscribe using consumer_types option.
   
   for i in range(10):
       msg = subscriber.receive()
       print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
       subscriber.acknowledge(msg)

Example on stream management

    #get_stream_stats
    stream_collection.get_stream_stats('demostream', local=False) #for global persistent stream

    #Skip all messages on a stream subscription
    stream_collection.skip_all_messages_for_subscription('demostream', 'demosub')

    #Skip num messages on a topic subscription
    stream_collection.skip_messages_for_subscription('demostream', 'demosub', 10)

    #Expire messages for a given subscription of a stream.
    #expire time is in seconds
    stream_collection.expire_messages_for_subscription('demostream', 'demosub', 2)

    #Expire messages on all subscriptions of stream
    stream_collection.expire_messages_for_subscriptions('demostream',2)

    #Reset subscription to message position to closest timestamp
    #time is in milli-seconds
    stream_collection.reset_message_subscription_by_timestamp('demostream','demosub', 5)

    #Reset subscription to message position closest to given position
    #stream_collection.reset_message_for_subscription('demostream', 'demosub')

    #stream_collection.reset_message_subscription_by_position('demostream','demosub', 4)

    #trigger compaction status
    stream_collection.put_stream_compaction_status('demostream')

    #get stream compaction status
    stream_collection.get_stream_compaction_status('demostream')

    #Unsubscribes the given subscription on all streams on a stream fabric
    stream_collection.unsubscribe('demosub')

    #delete subscription of a stream
    #stream_collection.delete_stream_subscription('demostream', 'demosub' , local=False)

Example on RESTQL

  from c8 import C8Client
  import json
  import warnings
  warnings.filterwarnings("ignore")

  region = "try.macrometa.io"
  demo_tenant = "demo"
  demo_fabric = "_system"
  demo_user = "root"
  
  collection_name = "employees"
  query_name = "query1"
   
  #----------------------------
  print("create and save restql to query employees collection...")
  
  client = C8Client(protocol='https', host=region, port=443)
  fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxx')
 
 restql = { "query": {"parameter": {},  "name": query_name, 
                     "value": "FOR doc IN %s RETURN doc" % collection_name} }
                 
fabric.save_restql(data)

# ----------------Execute restql----------------
print("Execute restql...")
response = fabric.execute_restql(query_name)
print(response)

# ----------------Get all restql----------------
print("Get all restql...")
response = tenant.get_all_restql()
print(response)

Example on Spot Collections

  from c8 import C8Client
  import json
  import warnings
  warnings.filterwarnings("ignore")

  region = "try.macrometa.io"
  demo_tenant = "demo"
  demo_fabric = "_system"
  demo_user = "root"
  
  collection_name = "employees2"
   
# ----------------Create spot collection ----------------
  print("Creating spot collection...")
  client = C8Client(protocol='https', host=region, port=443)
  fabric = client.fabric(tenant=demo_tenant, name=demo_fabric, username=demo_user, password='xxx')
  collection = fabric.create_collection(collection_name, spot_collection=True)

  # ----------------Insert document into spot collection----------------
  print("Inserting data into spot collection...")
  data = {"record1": {"name": "Gandalf", "role": "Wizard", "_key": "Gandalf"},
        "record2": {"name": "Frodo", "role": "Hobbit", "_key": "Frodo"}}
        
  collection.insert(data["record1"])
  collection.insert(data["record2"])

# ----------------Read documents from spot collection----------------
  print("Reading data from spot collection...")
  record1 = collection.get("Gandalf")
  print(record1)
  record2 = collection.get("Frodo")
  print(record2)

Was this article helpful?