Alephys

Our Locations : Hyderabad, Texas, Singapore

Creating a Custom HTTP Source Connector for Kafka

Introduction

Apache Kafka has become the backbone of modern data pipelines, enabling real-time data streaming at scale. While Kafka provides many built-in connectors through its Connect API, sometimes you need to create custom connectors to meet specific requirements. In this post, I’ll walk through creating a custom HTTP source connector that pulls data from REST APIs into Kafka topics.

Why Build a Custom HTTP Connector?

There are several existing HTTP connectors for Kafka, but you might need a custom one when:

  1. You need specialized authentication mechanisms
  2. The API response requires complex parsing
  3. You need custom error handling or retry logic
  4. The existing connectors don’t support your specific API format
  5. You need to implement unique polling strategies

Prerequisites

Before we begin, ensure you have:

  • Java 8+ installed
  • Maven for dependency management
  • Kafka cluster (or local installation)
  • Basic understanding of Kafka Connect framework

Step 1: Set Up the Project Structure

Create a new Maven project with the following structure:

http-source-connector/
├── src/
│   ├── main/
│   │   ├── java/com/yourcompany/kafka/connect/http/
│   │   │   ├── HttpSourceConnector.java
│   │   │   ├── HttpSourceTask.java
│   │   │   ├── HttpSourceConfig.java
│   │   │   └── HttpAPIClient.java
│   │   └── resources/
│   │       └── http-connector.properties
│   └── test/
│       └── java/com/yourcompany/kafka/connect/http/
└── pom.xm



Step 2: Add Dependencies to pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>connect-api</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.13</version>
    </dependency>
</dependencies>

Step 3: Implement the Configuration Class

Create HttpSourceConfig.java to define your connector’s configuration:

public class HttpSourceConfig extends AbstractConfig {
    public static ConfigDef config() {
        return new ConfigDef()
            .define(HTTP_URL, Type.STRING, Importance.HIGH, HTTP_URL_DOC)
            .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC)
            .define(POLL_INTERVAL_MS, Type.LONG, 5000, Importance.MEDIUM, POLL_INTERVAL_MS_DOC)
            .define(HTTP_HEADERS, Type.STRING, "", Importance.LOW, HTTP_HEADERS_DOC);
    }
    public HttpSourceConfig(Map<String, String> parsedConfig) {
        super(config(), parsedConfig);
    }
}

Step 4: Implement the Connector Class

Create HttpSourceConnector.java:

public class HttpSourceConnector extends SourceConnector {
    private Map<String, String> configProps;
    @Override
    public Class<? extends Task> taskClass() {
        return HttpSourceTask.class;
    }
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(configProps);
        }
        return taskConfigs;
    }
    @Override
    public ConfigDef config() {
        return HttpSourceConfig.config();
    }
}

Step 5: Implement the Task Class

Create HttpSourceTask.java:

public class HttpSourceTask extends SourceTask {
    @Override
    public String version() {
        return "1.0";
    }
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastPollTime < pollInterval) {
            TimeUnit.MILLISECONDS.sleep(pollInterval - (currentTime - lastPollTime));
        }
        try {
            HttpGet request = new HttpGet(url);
            String response = httpClient.execute(request, httpResponse -> {
                return EntityUtils.toString(httpResponse.getEntity());
            });
            lastPollTime = System.currentTimeMillis();
            SourceRecord record = new SourceRecord(
                Collections.singletonMap("url", url),  // source partition
                Collections.singletonMap("timestamp", currentTime),  // source offset
                topic,  // target topic
                null,  // partition (null for default)
                null,  // key schema
                null,  // key
                null,  // value schema
                response  // value
            );
            return Collections.singletonList(record);
        } catch (IOException e) {
            log.error("Error polling HTTP endpoint", e);
            return null;
        }
    }
}

Step 6: Build and Package the Connector

Run the following Maven command to build the connector:

mvn clean package

This will create a JAR file in the target directory.

Step 7: Deploy the Connector

To deploy your custom connector:

  1. Copy the JAR file to your Kafka Connect plugin path (usually /usr/share/java/kafka-connect or similar)
  2. Restart your Kafka Connect worker
  3. Create a connector configuration file (e.g., http-connector.properties):
name=http-source-connector
connector.class=com.yourcompany.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.url=https://api.example.com/data
topic=http_data
poll.interval.ms=60000
  1. Start the connector using the Kafka Connect REST API:

Advanced Considerations

  1. Error Handling: Implement robust error handling for HTTP timeouts, rate limiting, etc.
  2. Authentication: Add support for OAuth, API keys, or other auth mechanisms.
  3. Pagination: Handle paginated API responses.
  4. Schema Support: Add support for Avro or JSON schemas.
  5. Metrics: Add metrics collection to monitor connector performance.
  6. Backpressure: Implement backpressure handling for high-volume APIs.
  7. Incremental Loading: Track offsets to only fetch new data.

Conclusion

Building a custom HTTP source connector for Kafka gives you complete control over how data flows from REST APIs into your Kafka topics. While this example provides a basic implementation, you can extend it to handle more complex scenarios specific to your use case.

Remember to thoroughly test your connector under various failure scenarios and monitor its performance in production. The Kafka Connect framework provides a solid foundation, allowing you to focus on the business logic of your data integration needs.

Ready to Streamline Your Data Pipelines?

If you’re looking to implement custom Kafka connectors or build robust data streaming solutions, Alephys can help you architect the perfect system tailored to your business needs and ease you through your process

Whether you’re integrating complex APIs, optimizing data flow performance, or designing an enterprise-scale streaming architecture, our team of data experts will handle the technical heavy lifting. We help you unlock the full potential of real-time data while you focus on driving business value.
Author: Siva Munaga, Solution Architect at Alephys. I specialize in building scalable data infrastructure and streaming solutions that power modern applications. Let’s connect on LinkedIn to discuss your Kafka and data integration challenges!

Leave a Comment

Your email address will not be published. Required fields are marked *