Confluent kafka python example

Confluent kafka python example DEFAULT

Confluent's Python Client for Apache KafkaTM

confluent-kafka-python provides a high-level Producer, Consumer and AdminClient compatible with all Apache KafkaTM brokers >= v0.8, Confluent Cloud and the Confluent Platform. The client is:

  • Reliable - It's a wrapper around librdkafka (provided automatically via binary wheels) which is widely deployed in a diverse set of production scenarios. It's tested using the same set of system tests as the Java client and more. It's supported by Confluent.

  • Performant - Performance is a key design consideration. Maximum throughput is on par with the Java client for larger message sizes (where the overhead of the Python interpreter has less impact). Latency is on par with the Java client.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

See the API documentation for more info.

Below are some examples of typical usage. For more examples, see the examples directory or the confluentinc/examples github repo for a Confluent Cloud example.

Producer

fromconfluent_kafkaimportProducerp=Producer({'bootstrap.servers': 'mybroker1,mybroker2'}) defdelivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """iferrisnotNone: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) fordatainsome_data_source: # Trigger any available delivery report callbacks from previous produce() callsp.poll(0) # Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.p.produce('mytopic', data.encode('utf-8'), callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.p.flush()

High-level Consumer

fromconfluent_kafkaimportConsumerc=Consumer({ 'bootstrap.servers': 'mybroker', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' }) c.subscribe(['mytopic']) whileTrue: msg=c.poll(1.0) ifmsgisNone: continueifmsg.error(): print("Consumer error: {}".format(msg.error())) continueprint('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()

AvroProducer

fromconfluent_kafkaimportavrofromconfluent_kafka.avroimportAvroProducervalue_schema_str="""{ "namespace": "my.test", "name": "value", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ]}"""key_schema_str="""{ "namespace": "my.test", "name": "key", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ]}"""value_schema=avro.loads(value_schema_str) key_schema=avro.loads(key_schema_str) value= {"name": "Value"} key= {"name": "Key"} defdelivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """iferrisnotNone: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) avroProducer=AvroProducer({ 'bootstrap.servers': 'mybroker,mybroker2', 'on_delivery': delivery_report, 'schema.registry.url': 'http://schema_registry_host:port' }, default_key_schema=key_schema, default_value_schema=value_schema) avroProducer.produce(topic='my_topic', value=value, key=key) avroProducer.flush()

AvroConsumer

fromconfluent_kafka.avroimportAvroConsumerfromconfluent_kafka.avro.serializerimportSerializerErrorc=AvroConsumer({ 'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'}) c.subscribe(['my_topic']) whileTrue: try: msg=c.poll(10) exceptSerializerErrorase: print("Message deserialization failed for {}: {}".format(msg, e)) breakifmsgisNone: continueifmsg.error(): print("AvroConsumer error: {}".format(msg.error())) continueprint(msg.value()) c.close()

AdminClient

Create topics:

fromconfluent_kafka.adminimportAdminClient, NewTopica=AdminClient({'bootstrap.servers': 'mybroker'}) new_topics= [NewTopic(topic, num_partitions=3, replication_factor=1) fortopicin ["topic1", "topic2"]] # Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.# Call create_topics to asynchronously create topics. A dict# of <topic,future> is returned.fs=a.create_topics(new_topics) # Wait for each operation to finish.fortopic, finfs.items(): try: f.result() # The result itself is Noneprint("Topic {} created".format(topic)) exceptExceptionase: print("Failed to create topic {}: {}".format(topic, e))

Thread Safety

The , and are all thread safe.

Install self-contained binary wheels

NOTE: The pre-built Linux wheels do NOT contain SASL Kerberos/GSSAPI support. If you need SASL Kerberos/GSSAPI support you must install librdkafka and its dependencies using the repositories below and then build confluent-kafka using the command in the "Install from source from PyPi" section below.

Install AvroProducer and AvroConsumer

Install from source from PyPi(requires librdkafka + dependencies to be installed separately):

For source install, see Prerequisites below.

The Python client (as well as the underlying C library librdkafka) supports all broker versions >= 0.8. But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it is not safe for a client to assume what protocol version is actually supported by the broker, thus you will need to hint the Python client what protocol version it may use. This is done through two configuration settings:

  • (default 0.9.0.1)
  • (default true)

When using a Kafka 0.10 broker or later you don't need to do anything ( is the default). If you use Kafka broker 0.9 or 0.8 you must set and set to your broker version, e.g .

More info here: https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility

If you're connecting to a Kafka cluster through SSL you will need to configure the client with (or if SASL authentication is used).

The client will use CA certificates to verify the broker's certificate. The embedded OpenSSL library will look for CA certificates in or . CA certificates are typically provided by the Linux distribution's package which needs to be installed through , , et.al.

If your system stores CA certificates in another location you will need to configure the client with .

Alternatively, the CA certificates can be provided by the certifi Python package. To use certifi, add an line and configure the client's CA location with .

  • Python >= 2.7 or Python 3.x
  • librdkafka >= 1.6.0 (latest release is embedded in wheels)

librdkafka is embedded in the macosx manylinux wheels, for other platforms, SASL Kerberos/GSSAPI support or when a specific version of librdkafka is desired, following these guidelines:

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-python. confluent-kafka-python has no affiliation with and is not endorsed by The Apache Software Foundation.

Instructions on building and testing confluent-kafka-python can be found here.

Sours: https://github.com/confluentinc/confluent-kafka-python

Reading Data from a Kafka Topic using Confluent Kafka in Python

In this tutorial, you will learn how to read data from a Kafka topic in Python.

To read data from a Kafka topic, we will use Confluent Kafka which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.

An application that reads data from a Kafka topic is called a Consumer application.

The steps below shows how to install the Confluent Kafka library and write code for a Kafka consumer application in Python:

Install Confluent Kafka

Install the Python Confluent Kafka Library:

Consumer

Consumer code to read data from a Kafka topic:

Kafka Consumer application is ready. Next, run the Kafka Services and create a topic test-topic on your local machine.

If you do not have Kafka server already setup on your local machine then see how to do it here for Windows and here for Mac/Ubuntu/Linux.

Sours: https://www.tutorialsbuddy.com/confluent-kafka-python-consumer-example
  1. 2014 chevy tahoe door panel
  2. What is volume offset kenwood
  3. Mark s allen channel 10
  4. 5 x 7 acrylic sheet

Confluent's Kafka Python Client

Classes:

  • JSONSerializer
  • JSONDeserializer Contain a short code where they are validating the information of the schema related to the instance using "jsonschema" package.

This is not 100% correct if you are using JSON Schemas that for instance are version "draft-07" You must use another another validator. See documentation here: https://python-jsonschema.readthedocs.io/en/stable/validate/#versioned-validators

For instance, if you are using JSON Draft-7 Schema you must use:

or you can use the "validate" but using the following property: "format"

You must try to produce data to Kafka using the JSONSerializer for the value (You must have Schema Registry) For instance, if you are using topic "test", you must have the schema registered as "topic-value" You can use this schema for instance:

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version ( and ): 1.7.0 "pip install confluent-kafka"
  • [ ] Apache Kafka broker version: confluentinc/cp-kafka:6.0.4 confluentinc/cp-zookeeper:5.5.0 confluentinc/cp-schema-registry:6.1.1
  • [ ] Client configuration: --bootstrap_servers "localhost:9094" Schema Registry conf:
  • [ ] Operating system: MacOS Big Sur 11.6
  • [ ] Provide client logs (with as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue
Sours: https://pythonrepo.com/repo/confluentinc-confluent-kafka-python-python-connecting-and-operating-databases
Python Kafka Producer-Consumer - Implementation - Code - Kafka Group ID - Practical Demonstration

Kafka Python Client¶

Confluent develops and maintains confluent-kafka-python, a Python Client for Apache Kafka® that provides a high-level Producer, Consumer and AdminClient compatible with all Kafka brokers >= v0.8, Confluent Cloud and Confluent Platform.

Python Client installation¶

The client is available on PyPI and can be installed using :

pip install confluent-kafka

You can install it globally, or within a virtualenv.

Note

The confluent-kafka Python package is a binding on top of the C client librdkafka. It comes bundled with a pre-built version of librdkafka which does not include GSSAPI/Kerberos support. For information how to install a version that supports GSSAPI, see the installation instructions.

Python Client demo code¶

For Hello World examples of Kafka clients in Python, see Python. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud. They also include examples of how to produce and consume Avro data with Schema Registry.

Kafka Producer¶

Initialization¶

The Producer is configured using a dictionary as follows:

fromconfluent_kafkaimportProducerimportsocketconf={'bootstrap.servers':"host1:9092,host2:9092",'client.id':socket.gethostname()}producer=Producer(conf)

For information on the available configuration properties, refer to the API Documentation.

Asynchronous writes¶

To initiate sending a message to Kafka, call the method, passing in the message value (which may be ) and optionally a key, partition, and callback. The produce call will complete immediately and does not return a value. A will be thrown if the message could not be enqueued due to librdkafka’s local produce queue being full.

producer.produce(topic,key="key",value="value")

To receive notification of delivery success or failure, you can pass a parameter. This can be any callable, for example, a lambda, function, bound method, or callable object. Although the method enqueues message immediately for batching, compression and transmission to broker, no delivery notification events will be propagated until is invoked.

defacked(err,msg):iferrisnotNone:print("Failed to deliver message: %s: %s"%(str(msg),str(err)))else:print("Message produced: %s"%(str(msg)))producer.produce(topic,key="key",value="value",callback=acked)# Wait up to 1 second for events. Callbacks will be invoked during# this method call if the message is acknowledged.producer.poll(1)

Synchronous writes¶

The Python client provides a method which can be used to make writes synchronous. This is typically a bad idea since it effectively limits throughput to the broker round trip time, but may be justified in some cases.

producer.produce(topic,key="key",value="value")producer.flush()

Typically, should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.

Kafka Consumer¶

Initialization¶

The Consumer is configured using a dictionary as follows:

fromconfluent_kafkaimportConsumerconf={'bootstrap.servers':"host1:9092,host2:9092",'group.id':"foo",'auto.offset.reset':'smallest'}consumer=Consumer(conf)

The property is mandatory and specifies which consumer group the consumer is a member of. The property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

This is another example with the configured to in the consumer. The default value is .

fromconfluent_kafkaimportConsumerconf={'bootstrap.servers':'host1:9092,host2:9092','group.id':"foo",'enable.auto.commit':False,'auto.offset.reset':'earliest'}consumer=Consumer(conf)

For information on the available configuration properties, refer to the API Documentation.

Python Client code examples¶

Basic poll loop¶

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in behind the scenes. Before entering the consume loop, you’ll typically use the method to specify which topics should be fetched from:

running=Truedefbasic_consume_loop(consumer,topics):try:consumer.subscribe(topics)whilerunning:msg=consumer.poll(timeout=1.0)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:msg_process(msg)finally:# Close down consumer to commit final offsets.consumer.close()defshutdown():running=False

The poll timeout is hard-coded to 1 second. If no records are received before this timeout expires, then will return an empty record set.

Note that you should always call after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired.

Synchronous commits¶

The simplest and most reliable way to manually commit offsets is by setting the parameter to the method call. This method can also accept the mutually exclusive keyword parameters to explicitly list the offsets for each assigned topic partition and which will commit offsets relative to a object returned by .

defconsume_loop(consumer,topics):try:consumer.subscribe(topics)msg_count=0whilerunning:msg=consumer.poll(timeout=1.0)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:msg_process(msg)msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0:consumer.commit(asynchronous=False)finally:# Close down consumer to commit final offsets.consumer.close()

In this example, a synchronous commit is triggered every messages. The flag controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees¶

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

defconsume_loop(consumer,topics):try:consumer.subscribe(topics)whilerunning:msg=consumer.poll(timeout=1.0)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:consumer.commit(asynchronous=False)msg_process(msg)finally:# Close down consumer to commit final offsets.consumer.close()

For simplicity in this example, is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous Commits¶

defconsume_loop(consumer,topics):try:consumer.subscribe(topics)msg_count=0whilerunning:msg=consumer.poll(timeout=1.0)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:msg_process(msg)msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0:consumer.commit(asynchronous=True)finally:# Close down consumer to commit final offsets.consumer.close()

In this example, the consumer sends the request and returns immediately by using asynchronous commits. The parameter to is changed to . The value is passed in explicitly, but asynchronous commits are the default if the parameter is not included.

The API gives you a callback which is invoked when the commit either succeeds or fails. The commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.

fromconfluent_kafkaimportConsumerdefcommit_completed(err,partitions):iferr:print(str(err))else:print("Committed partition offsets: "+str(partitions))conf={'bootstrap.servers':"host1:9092,host2:9092",'group.id':"foo",'default.topic.config':{'auto.offset.reset':'smallest'},'on_commit':commit_completed}consumer=Consumer(conf)

API documentation¶

Click here to view the Python Client API documentation.

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html

Python example kafka confluent

Write Data to a Kafka Topic using Confluent Kafka in Python

In this tutorial, you will learn how to write data to a Kafka topic in Python.

To send data to a Kafka topic, we will use Confluent Kafka which is one of the best Python client libraries for Apache Kafka. It provides a high level Producer, Consumer, and AdminClient.

An application that writes data to a Kafka topic is called a Producer Application.

The steps below shows how to install the Confluent Kafka library and write code for a Kafka producer application in Python:

Install Confluent Kafka

Install the latest version of Python Confluent Kafka Library:

Producer

Producer code to send data to a Kafka topic:

Kafka Producer application is ready. Next, run the Kafka Services and create a topic test-topic on your local machine.

If you do not have Kafka server already setup on your local machine then see how to do it here for Windows and here for Mac/Ubuntu/Linux.

Sours: https://www.tutorialsbuddy.com/confluent-kafka-python-producer-example
Apache Kafka for Beginners (3+ hours long)
#!/usr/bin/env python## Copyright 2016 Confluent Inc.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.### Example high-level Kafka 0.9 balanced Consumer#fromconfluent_kafkaimportConsumer, KafkaExceptionimportsysimportgetoptimportjsonimportloggingfrompprintimportpformatdefstats_cb(stats_json_str):stats_json=json.loads(stats_json_str)print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))defprint_usage_and_exit(program_name):sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n'%program_name)options=''' Options: -T <intvl> Enable client statistics at specified interval (ms)'''sys.stderr.write(options)sys.exit(1)if__name__=='__main__':optlist, argv=getopt.getopt(sys.argv[1:], 'T:')iflen(argv) <3:print_usage_and_exit(sys.argv[0])broker=argv[0]group=argv[1]topics=argv[2:]# Consumer configuration# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconf= {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,'auto.offset.reset': 'earliest'}# Check to see if -T option existsforoptinoptlist:ifopt[0] !='-T':continuetry:intval=int(opt[1])exceptValueError:sys.stderr.write("Invalid option value for -T: %s\n"%opt[1])sys.exit(1)ifintval<=0:sys.stderr.write("-T option value needs to be larger than zero: %s\n"%opt[1])sys.exit(1)conf['stats_cb'] =stats_cbconf['statistics.interval.ms'] =int(opt[1])# Create logger for consumer (logs will be emitted when poll() is called)logger=logging.getLogger('consumer')logger.setLevel(logging.DEBUG)handler=logging.StreamHandler()handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))logger.addHandler(handler)# Create Consumer instance# Hint: try debug='fetch' to generate some log messagesc=Consumer(conf, logger=logger)defprint_assignment(consumer, partitions):print('Assignment:', partitions)# Subscribe to topicsc.subscribe(topics, on_assign=print_assignment)# Read messages from Kafka, print to stdouttry:whileTrue:msg=c.poll(timeout=1.0)ifmsgisNone:continueifmsg.error():raiseKafkaException(msg.error())else:# Proper messagesys.stderr.write('%% %s [%d] at offset %d with key %s:\n'% (msg.topic(), msg.partition(), msg.offset(),str(msg.key())))print(msg.value())exceptKeyboardInterrupt:sys.stderr.write('%% Aborted by user\n')finally:# Close down consumer to commit final offsets.c.close()
Sours: https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py

Similar news:

Python confluent_kafka.Producer() Examples

The following are 30 code examples for showing how to use confluent_kafka.Producer(). These examples are extracted from open source projects. You can vote up the ones you like or vote down the ones you don't like, and go to the original project or source file by following the links above each example.

You may check out the related API usage on the sidebar.

You may also want to check out all available functions/classes of the module confluent_kafka, or try the search function .

Example 1

def _initialize_members(self, topic, server, port, zk_server, zk_port, partitions): # get logger isinstance self._logger = logging.getLogger("SPOT.INGEST.KafkaProducer") # kafka requirements self._server = server self._port = port self._zk_server = zk_server self._zk_port = zk_port self._topic = topic self._num_of_partitions = partitions self._partitions = [] self._partitioner = None self._kafka_brokers = '{0}:{1}'.format(self._server, self._port) # create topic with partitions self._create_topic() self._kafka_conf = self._producer_config(self._kafka_brokers) self._p = Producer(**self._kafka_conf)

Example 2

def init(): global log global kafka_producer if not log: log = create_logger(Config().get("logging")) if kafka_producer: raise Exception("XOSKafkaProducer already initialized") else: log.info( "Connecting to Kafka with bootstrap servers: %s" % Config.get("kafka_bootstrap_servers") ) try: producer_config = { "bootstrap.servers": ",".join(Config.get("kafka_bootstrap_servers")) } kafka_producer = confluent_kafka.Producer(**producer_config) log.info("Connected to Kafka: %s" % kafka_producer) except confluent_kafka.KafkaError as e: log.exception("Kafka Error: %s" % e)

Example 3

def create_producer(self, retry_limit, buffering_max): console_out_many(["Creating producer with:", f" bootstrap.servers={self.broker_manager.get_bootstrap_servers()}", f" acks={self.acks_mode}", f" retries={retry_limit}", f" buffering={buffering_max}"], self.get_actor()) self.producer = Producer({'bootstrap.servers': self.broker_manager.get_bootstrap_servers(), 'message.send.max.retries': retry_limit, 'queue.buffering.max.ms': buffering_max, #'queue.buffering.max.ms': 100, #'batch.num.messages': 1000, #'stats_cb': my_stats_callback, #'statistics.interval.ms': 100, 'metadata.max.age.ms': 60000, 'default.topic.config': { 'request.required.acks': self.acks_mode }})

Example 4

def create_idempotent_producer(self, retry_limit, buffering_max): console_out_many(["Creating idempotent producer with:", f" bootstrap.servers={self.broker_manager.get_bootstrap_servers()}", f" acks={self.acks_mode}", f" retries={retry_limit}", " metadata.max.age.ms: 60000", f" buffering={buffering_max}"], self.get_actor()) self.producer = Producer({'bootstrap.servers': self.broker_manager.get_bootstrap_servers(), 'message.send.max.retries': retry_limit, 'enable.idempotence': True, 'queue.buffering.max.ms': buffering_max, #'batch.num.messages': 1000, #'stats_cb': my_stats_callback, #'statistics.interval.ms': 100, 'metadata.max.age.ms': 60000, 'default.topic.config': { 'request.required.acks': self.acks_mode } })

Example 5

def prepareProducer(self,groupID = "pythonproducers"): options ={ 'bootstrap.servers': self.kafka_brokers, 'group.id': groupID } # We need this test as local kafka does not expect SSL protocol. if (self.kafka_env != 'LOCAL'): options['security.protocol'] = 'SASL_SSL' options['sasl.mechanisms'] = 'PLAIN' options['sasl.username'] = 'token' options['sasl.password'] = self.kafka_apikey if (self.kafka_env == 'OCP'): options['ssl.ca.location'] = os.environ['PEM_CERT'] print("[KafkaProducer] - This is the configuration for the producer:") print('[KafkaProducer] - {}'.format(options)) self.producer = Producer(options)

Example 6

def test_ksql_create_stream_w_properties(self): """ Test GET requests """ topic = self.exist_topic stream_name = self.test_prefix + "test_ksql_create_stream" stream_name = "test_ksql_create_stream" ksql_string = "CREATE STREAM {} (ORDER_ID INT, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \ WITH (kafka_topic='{}', value_format='JSON');".format(stream_name, topic) streamProperties = {"ksql.streams.auto.offset.reset": "earliest"} if 'TEST_KSQL_CREATE_STREAM' not in utils.get_all_streams(self.api_client): r = self.api_client.ksql(ksql_string, stream_properties=streamProperties) self.assertEqual(r[0]['commandStatus']['status'], 'SUCCESS') producer = Producer({'bootstrap.servers': self.bootstrap_servers}) producer.produce(self.exist_topic, '''{"order_id":3,"total_amount":43,"customer_name":"Palo Alto"}''') producer.flush() print() chunks = self.api_client.query("select * from {}".format(stream_name), stream_properties=streamProperties, idle_timeout=10) for chunk in chunks: pass assert json.loads(chunk)['row']['columns'][-1]=='Palo Alto'

Example 7

def test_conf_none(): """ Issue #133 Test that None can be passed for NULL by setting bootstrap.servers to None. If None would be converted to a string then a broker would show up in statistics. Verify that it doesnt. """ def stats_cb_check_no_brokers(stats_json_str): """ Make sure no brokers are reported in stats """ global seen_stats_cb_check_no_brokers stats = json.loads(stats_json_str) assert len(stats['brokers']) == 0, "expected no brokers in stats: %s" % stats_json_str seen_stats_cb_check_no_brokers = True conf = {'bootstrap.servers': None, # overwrites previous value 'statistics.interval.ms': 10, 'stats_cb': stats_cb_check_no_brokers} p = confluent_kafka.Producer(conf) p.poll(timeout=1) global seen_stats_cb_check_no_brokers assert seen_stats_cb_check_no_brokers

Example 8

def test_error_cb(): """ Test the error callback. """ global seen_all_brokers_down # Configure an invalid broker and make sure the ALL_BROKERS_DOWN # error is seen in the error callback. p = Producer({'bootstrap.servers': '127.0.0.1:1', 'socket.timeout.ms': 10, 'error_cb': error_cb}) t_end = time.time() + 5 while not seen_all_brokers_down and time.time() < t_end: p.poll(1) assert seen_all_brokers_down

Example 9

def test_fatal(): """ Test fatal exceptions """ # Configure an invalid broker and make sure the ALL_BROKERS_DOWN # error is seen in the error callback. p = Producer({'error_cb': error_cb}) with pytest.raises(KafkaException) as exc: raise KafkaException(KafkaError(KafkaError.MEMBER_ID_REQUIRED, fatal=True)) err = exc.value.args[0] assert isinstance(err, KafkaError) assert err.fatal() assert not err.retriable() assert not err.txn_requires_abort() p.poll(0) # Need some p use to avoid flake8 unused warning

Example 10

def _get_kafka_producer(self): try: if self.kafka_endpoint.startswith('@'): try: _k_endpoint = get_endpoint_from_consul(self.consul_endpoint, self.kafka_endpoint[1:]) log.debug('found-kafka-service', endpoint=_k_endpoint) except Exception as e: log.exception('no-kafka-service-in-consul', e=e) self.kproducer = None self.kclient = None return else: _k_endpoint = self.kafka_endpoint self.kproducer = _kafkaProducer( {'bootstrap.servers' :_k_endpoint} ) pass except Exception, e: log.exception('failed-get-kafka-producer', e=e) return

Example 11

def Kafka(node, servers='', topic='', json=False, wrap=False): '''Connect to kafka server and send data Args: foo (callable): input stream foo_kwargs (dict): kwargs for the input stream servers (list): kafka bootstrap servers group (str): kafka group id topics (list): list of kafka topics to connect to json (bool): load input data as json wrap (bool): wrap result in a list interval (int): kafka poll interval ''' p = Producer({'bootstrap.servers': servers}) def _send(data, producer=p, topic=topic, json=json, wrap=wrap): # Trigger any available delivery report callbacks from previous produce() calls producer.poll(0) if wrap: data = [data] if json: data = JSON.dumps(data) producer.produce(topic, data.encode('utf-8')) return data ret = Node(foo=_send, name='Kafka', inputs=1, graphvizshape=_OUTPUT_GRAPHVIZSHAPE) node >> ret return ret

Example 12

def create_kafka_consumer(brokers_ips: str, extra_params: Dict): config = extra_params or dict() config.update({'bootstrap.servers': ",".join(brokers_ips)}) return confluent_kafka.Producer(config)

Example 13

def __init__(self, config): # pragma: no cover """ Streaming client implementation based on Kafka. Configuration keys: KAFKA_ADDRESS KAFKA_CONSUMER_GROUP KAFKA_TOPIC TIMEOUT EVENT_HUB_KAFKA_CONNECTION_STRING """ self.logger = Logger() self.topic = config.get("KAFKA_TOPIC") if not self.topic: raise ValueError("KAFKA_TOPIC is not set in the config object.") if not config.get("KAFKA_ADDRESS"): raise ValueError("KAFKA_ADDRESS is not set in the config object.") if config.get("TIMEOUT"): try: self.timeout = int(config.get("TIMEOUT")) except ValueError: self.timeout = None else: self.timeout = None kafka_config = self.create_kafka_config(config) self.admin = admin.AdminClient(kafka_config) if config.get("KAFKA_CONSUMER_GROUP") is None: self.logger.info('Creating Producer') self.producer = Producer(kafka_config) self.run = False else: self.logger.info('Creating Consumer') self.consumer = Consumer(kafka_config) self.run = True signal.signal(signal.SIGTERM, self.exit_gracefully)

Example 14

def main(): """Main entry for script""" parser = _get_parser() args = parser.parse_args() sources = _get_sources(_get_items(args.items), args.limit) timestamp = pd.Timestamp(args.start) freq = pd.Timedelta(args.freq) logging.basicConfig(level=_VERBOSITY.get(args.verbosity, logging.DEBUG)) if args.broker_list is None: def _produce(timestamp, name, price): print('{},{},{}'.format(timestamp, name, price)) LOGGER.debug('Running in console mode') _run(sources, timestamp, freq, args.real_time, args.real_time_multiplier, _produce) else: if args.topic is None: raise ValueError('Must specify --topic when using Kafka') from confluent_kafka import Producer producer = Producer({'bootstrap.servers': args.broker_list}) def _produce(timestamp, name, price): data = '{},{},{}'.format(timestamp, name, price) produced = False while not produced: try: producer.produce(args.topic, value=data.encode('utf-8'), key=name) producer.poll(0) produced = True except BufferError: producer.poll(10) LOGGER.debug('Producing to %s on %s', args.topic, args.broker_list) _run(sources, timestamp, freq, args.real_time, args.real_time_multiplier, _produce) producer.flush()

Example 15

def producer(self): # TODO: Must set all config values applicable to a producer return kafka.Producer({'bootstrap.servers': self.config.BOOTSTRAP_SERVERS})

Example 16

def __init__(self): self.LOG_CLASS = "StoreLog" self.method = settings.LOG_TYPE if self.method == settings.STORE: self.conf = settings.STORE_CONF elif self.method == settings.TOSERVER: self.conf = settings.SERVER_CONF elif self.method == settings.TOKAFKA: self.producer = Producer({'bootstrap.servers':'localhost:9092'})

Example 17

def init(): global log global kafka_producer if not log: log = create_logger(Config().get("logging")) if kafka_producer: raise Exception("XOSKafkaProducer already initialized") else: log.info( "Connecting to Kafka with bootstrap servers: %s" % Config.get("kafka_bootstrap_servers") ) try: producer_config = { "bootstrap.servers": ",".join(Config.get("kafka_bootstrap_servers")) } kafka_producer = confluent_kafka.Producer(**producer_config) log.info("Connected to Kafka: %s" % kafka_producer) except confluent_kafka.KafkaError as e: log.exception("Kafka Error: %s" % e)

Example 18

def __init__(self): super(TwitterStreamListener, self).__init__() self.producer = Producer(**KAFKA_CONF) self.count = 0 self.tweets = []

Example 19

def setUp(self): self.url = "http://localhost:8088" self.api_client = KSQLAPI(url=self.url, check_version=False) self.test_prefix = "ksql_python_test" self.exist_topic = self.test_prefix + '_exist_topic' self.bootstrap_servers = 'localhost:29092' if utils.check_kafka_available(self.bootstrap_servers): producer = Producer({'bootstrap.servers': self.bootstrap_servers}) producer.produce(self.exist_topic, "test_message") producer.flush()

Example 20

def setUp(self): self.url = "http://localhost:8088" self.api_client = KSQLAPI(url=self.url, check_version=False) self.test_prefix = "ksql_python_test" self.exist_topic = 'exist_topic' self.bootstrap_servers = 'localhost:29092' if utils.check_kafka_available(self.bootstrap_servers): producer = Producer({'bootstrap.servers': self.bootstrap_servers}) producer.produce(self.exist_topic, "test_message") producer.flush()

Example 21

def __init__(self, configs, loop=None): self._loop = loop or asyncio.get_event_loop() self._producer = confluent_kafka.Producer(configs) self._cancelled = False self._poll_thread = Thread(target=self._poll_loop) self._poll_thread.start()

Example 22

def __init__(self, configs): self._producer = confluent_kafka.Producer(configs) self._cancelled = False self._poll_thread = Thread(target=self._poll_loop) self._poll_thread.start()

Example 23

def startup_event(): global producer, aio_producer aio_producer = AIOProducer(config) producer = Producer(config)

Example 24

def throttle_cb_instantiate_fail(): """ Ensure noncallables raise TypeError""" with pytest.raises(ValueError): confluent_kafka.Producer({'throttle_cb': 1})

Example 25

def test_topic_config_update(): # *NOTE* default.topic.config has been deprecated. # This example remains to ensure backward-compatibility until its removal. confs = [{"message.timeout.ms": 600000, "default.topic.config": {"message.timeout.ms": 1000}}, {"message.timeout.ms": 1000}, {"default.topic.config": {"message.timeout.ms": 1000}}] def on_delivery(err, msg): # Since there is no broker, produced messages should time out. global seen_delivery_cb seen_delivery_cb = True assert err.code() == confluent_kafka.KafkaError._MSG_TIMED_OUT for conf in confs: p = confluent_kafka.Producer(conf) start = time.time() timeout = start + 10.0 p.produce('mytopic', value='somedata', key='a key', on_delivery=on_delivery) while time.time() < timeout: if seen_delivery_cb: return p.poll(1.0) if "CI" in os.environ: pytest.xfail("Timeout exceeded") pytest.fail("Timeout exceeded")

Example 26

def dr_cb(self, err, msg): """ Producer delivery report callback """ if err is not None: self.logger.warning("producer: delivery failed: {} [{}]: {}".format(msg.topic(), msg.partition(), err)) self.dr_err_cnt += 1 else: self.dr_cnt += 1 if (self.dr_cnt % self.disprate) == 0: self.logger.debug("producer: delivered message to {} [{}] at offset {}".format( msg.topic(), msg.partition(), msg.offset()))

Example 27

def producer_run(self): """ Producer main loop """ sleep_intvl = 1.0 / self.rate self.producer_msgid = 0 self.dr_cnt = 0 self.dr_err_cnt = 0 self.producer_error_cb_cnt = 0 next_stats = time.time() + 10 while self.run: self.produce_record() now = time.time() t_end = now + sleep_intvl while True: if now > next_stats: self.producer_stats() next_stats = now + 10 remaining_time = t_end - now if remaining_time < 0: remaining_time = 0 self.producer.poll(remaining_time) if remaining_time <= 0: break now = time.time() remaining = self.producer.flush(30) self.logger.warning("producer: {} message(s) remaining in queue after flush()".format(remaining)) self.producer_stats()

Example 28

def producer_error_cb(self, err): """ Producer error callback """ self.logger.error("producer: error_cb: {}".format(err)) self.producer_error_cb_cnt += 1

Example 29

def __init__(self, topic, rate, conf): """ SoakClient constructor. conf is the client configuration """ self.topic = topic self.rate = rate self.disprate = int(rate * 10) self.run = True self.stats_cnt = {'producer': 0, 'consumer': 0} self.start_time = time.time() self.logger = logging.getLogger('soakclient') self.logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s')) self.logger.addHandler(handler) # Create topic (might already exist) self.create_topic(self.topic, conf) # # Create Producer and Consumer, each running in its own thread. # conf['stats_cb'] = self.stats_cb conf['statistics.interval.ms'] = 10000 # Producer conf['error_cb'] = self.producer_error_cb self.producer = Producer(conf) # Consumer conf['error_cb'] = self.consumer_error_cb conf['on_commit'] = self.consumer_commit_cb self.logger.info("consumer: using group.id {}".format(conf['group.id'])) self.consumer = Consumer(conf) self.producer_thread = threading.Thread(target=self.producer_thread_main) self.producer_thread.start() self.consumer_thread = threading.Thread(target=self.consumer_thread_main) self.consumer_thread.start()

Example 30

def terminate(self): """ Terminate Producer and Consumer """ soak.logger.info("Terminating (ran for {}s)".format(time.time() - self.start_time)) self.run = False self.producer_thread.join() self.consumer_thread.join()
Sours: https://www.programcreek.com/python/example/119963/confluent_kafka.Producer


636 637 638 639 640