Introduction
Apache Kafka has become the backbone of modern data pipelines, enabling real-time data streaming at scale. While Kafka provides many built-in connectors through its Connect API, sometimes you need to create custom connectors to meet specific requirements. In this post, I’ll walk through creating a custom HTTP source connector that pulls data from REST APIs into Kafka topics.
Why Build a Custom HTTP Connector?
There are several existing HTTP connectors for Kafka, but you might need a custom one when:
- You need specialized authentication mechanisms
- The API response requires complex parsing
- You need custom error handling or retry logic
- The existing connectors don’t support your specific API format
- You need to implement unique polling strategies
Prerequisites
Before we begin, ensure you have:
- Java 8+ installed
- Maven for dependency management
- Kafka cluster (or local installation)
- Basic understanding of Kafka Connect framework
Step 1: Set Up the Project Structure
Create a new Maven project with the following structure:
http-source-connector/
├── src/
│ ├── main/
│ │ ├── java/com/yourcompany/kafka/connect/http/
│ │ │ ├── HttpSourceConnector.java
│ │ │ ├── HttpSourceTask.java
│ │ │ ├── HttpSourceConfig.java
│ │ │ └── HttpAPIClient.java
│ │ └── resources/
│ │ └── http-connector.properties
│ └── test/
│ └── java/com/yourcompany/kafka/connect/http/
└── pom.xml─
Step 2: Add Dependencies to pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
</dependency>
Step 3: Implement the Configuration Class
Create HttpSourceConfig.java to define your connector’s configuration:
package com.yourcompany.kafka.connect.http;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import java.util.Map;
public class HttpSourceConfig extends AbstractConfig {
public static final String HTTP_URL = “http.url”;
private static final String HTTP_URL_DOC = “The URL to poll data from”;
public static final String TOPIC_CONFIG = “topic”;
private static final String TOPIC_DOC = “Topic to write to”;
public static final String POLL_INTERVAL_MS = “poll.interval.ms”;
private static final String POLL_INTERVAL_MS_DOC = “Interval in milliseconds to poll the HTTP URL”;
public static final String HTTP_HEADERS = “http.headers”;
private static final String HTTP_HEADERS_DOC = “Headers to include in the HTTP request”;
public static ConfigDef config() {
return new ConfigDef()
.define(HTTP_URL, Type.STRING, Importance.HIGH, HTTP_URL_DOC)
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC)
.define(POLL_INTERVAL_MS, Type.LONG, 5000, Importance.MEDIUM, POLL_INTERVAL_MS_DOC)
.define(HTTP_HEADERS, Type.STRING, “”, Importance.LOW, HTTP_HEADERS_DOC);
}
public HttpSourceConfig(Map<String, String> parsedConfig) {
super(config(), parsedConfig);
}
}
Step 4: Implement the Connector Class
Create HttpSourceConnector.java:
package com.yourcompany.kafka.connect.http;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HttpSourceConnector extends SourceConnector {
private Map<String, String> configProps;
@Override
public void start(Map<String, String> props) {
configProps = props;
}
@Override
public Class<? extends Task> taskClass() {
return HttpSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(configProps);
}
return taskConfigs;
}
@Override
public void stop() {
// Clean up resources if needed
}
@Override
public ConfigDef config() {
return HttpSourceConfig.config();
}
@Override
public String version() {
return “1.0”;
}
}
Step 5: Implement the Task Class
Create HttpSourceTask.java:
package com.yourcompany.kafka.connect.http;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class HttpSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(HttpSourceTask.class);
private HttpSourceConfig config;
private CloseableHttpClient httpClient;
private long lastPollTime;
private String url;
private String topic;
private long pollInterval;
@Override
public String version() {
return “1.0”;
}
@Override
public void start(Map<String, String> props) {
config = new HttpSourceConfig(props);
httpClient = HttpClients.createDefault();
url = config.getString(HttpSourceConfig.HTTP_URL);
topic = config.getString(HttpSourceConfig.TOPIC_CONFIG);
pollInterval = config.getLong(HttpSourceConfig.POLL_INTERVAL_MS);
lastPollTime = 0;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
long currentTime = System.currentTimeMillis();
if (currentTime – lastPollTime < pollInterval) {
TimeUnit.MILLISECONDS.sleep(pollInterval – (currentTime – lastPollTime));
}
try {
HttpGet request = new HttpGet(url);
String response = httpClient.execute(request, httpResponse -> {
return EntityUtils.toString(httpResponse.getEntity());
});
lastPollTime = System.currentTimeMillis();
SourceRecord record = new SourceRecord(
Collections.singletonMap(“url”, url), // source partition
Collections.singletonMap(“timestamp”, currentTime), // source offset
topic, // target topic
null, // partition (null for default)
null, // key schema
null, // key
null, // value schema
response // value
);
return Collections.singletonList(record);
} catch (IOException e) {
log.error(“Error polling HTTP endpoint”, e);
return null;
}
}
@Override
public void stop() {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
log.error(“Error closing HTTP client”, e);
}
}
}
Step 6: Build and Package the Connector
Run the following Maven command to build the connector:
mvn clean package
This will create a JAR file in the target directory.
Step 7: Deploy the Connector
To deploy your custom connector:
- Copy the JAR file to your Kafka Connect plugin path (usually /usr/share/java/kafka-connect or similar)
- Restart your Kafka Connect worker
- Create a connector configuration file (e.g., http-connector.properties):
name=http-source-connector
connector.class=com.yourcompany.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.url=https://api.example.com/data
topic=http_data
poll.interval.ms=60000
- Start the connector using the Kafka Connect REST API:
Advanced Considerations
- Error Handling: Implement robust error handling for HTTP timeouts, rate limiting, etc.
- Authentication: Add support for OAuth, API keys, or other auth mechanisms.
- Pagination: Handle paginated API responses.
- Schema Support: Add support for Avro or JSON schemas.
- Metrics: Add metrics collection to monitor connector performance.
- Backpressure: Implement backpressure handling for high-volume APIs.
- Incremental Loading: Track offsets to only fetch new data.
Conclusion
Building a custom HTTP source connector for Kafka gives you complete control over how data flows from REST APIs into your Kafka topics. While this example provides a basic implementation, you can extend it to handle more complex scenarios specific to your use case.
Remember to thoroughly test your connector under various failure scenarios and monitor its performance in production. The Kafka Connect framework provides a solid foundation, allowing you to focus on the business logic of your data integration needs.
Ready to Streamline Your Data Pipelines?
If you’re looking to implement custom Kafka connectors or build robust data streaming solutions, Alephys can help you architect the perfect system tailored to your business needs and ease you through your process
Whether you’re integrating complex APIs, optimizing data flow performance, or designing an enterprise-scale streaming architecture, our team of data experts will handle the technical heavy lifting. We help you unlock the full potential of real-time data while you focus on driving business value.
Author: Siva Munaga, Solution Architect at Alephys. I specialize in building scalable data infrastructure and streaming solutions that power modern applications. Let’s connect on LinkedIn to discuss your Kafka and data integration challenges!