Alephys

Our Locations : Hyderabad, Texas, Singapore

Creating a Custom HTTP Source Connector for Kafka

Introduction

Apache Kafka has become the backbone of modern data pipelines, enabling real-time data streaming at scale. While Kafka provides many built-in connectors through its Connect API, sometimes you need to create custom connectors to meet specific requirements. In this post, I’ll walk through creating a custom HTTP source connector that pulls data from REST APIs into Kafka topics.

Why Build a Custom HTTP Connector?

There are several existing HTTP connectors for Kafka, but you might need a custom one when:

  1. You need specialized authentication mechanisms
  2. The API response requires complex parsing
  3. You need custom error handling or retry logic
  4. The existing connectors don’t support your specific API format
  5. You need to implement unique polling strategies

Prerequisites

Before we begin, ensure you have:

  • Java 8+ installed
  • Maven for dependency management
  • Kafka cluster (or local installation)
  • Basic understanding of Kafka Connect framework

Step 1: Set Up the Project Structure

Create a new Maven project with the following structure:

http-source-connector/

├── src/

│   ├── main/

│   │   ├── java/com/yourcompany/kafka/connect/http/

│   │   │   ├── HttpSourceConnector.java

│   │   │   ├── HttpSourceTask.java

│   │   │   ├── HttpSourceConfig.java

│   │   │   └── HttpAPIClient.java

│   │   └── resources/

│   │       └── http-connector.properties

│   └── test/

│       └── java/com/yourcompany/kafka/connect/http/

└── pom.xml─

Step 2: Add Dependencies to pom.xml

<dependencies>

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>connect-api</artifactId>

        <version>2.8.0</version>

    </dependency>

    <dependency>

        <groupId>org.apache.httpcomponents</groupId>

        <artifactId>httpclient</artifactId>

        <version>4.5.13</version>

    </dependency>

    <dependency>

        <groupId>com.fasterxml.jackson.core</groupId>

        <artifactId>jackson-databind</artifactId>

        <version>2.12.3</version>

    </dependency>

    <dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-api</artifactId>

        <version>1.7.30</version>

    </dependency>

</dependency>

Step 3: Implement the Configuration Class

Create HttpSourceConfig.java to define your connector’s configuration:

package com.yourcompany.kafka.connect.http;

import org.apache.kafka.common.config.AbstractConfig;

import org.apache.kafka.common.config.ConfigDef;

import org.apache.kafka.common.config.ConfigDef.Type;

import org.apache.kafka.common.config.ConfigDef.Importance;

import java.util.Map;

public class HttpSourceConfig extends AbstractConfig {

    public static final String HTTP_URL = “http.url”;

    private static final String HTTP_URL_DOC = “The URL to poll data from”;

    public static final String TOPIC_CONFIG = “topic”;

    private static final String TOPIC_DOC = “Topic to write to”;

    public static final String POLL_INTERVAL_MS = “poll.interval.ms”;

    private static final String POLL_INTERVAL_MS_DOC = “Interval in milliseconds to poll the HTTP URL”;

    public static final String HTTP_HEADERS = “http.headers”;

    private static final String HTTP_HEADERS_DOC = “Headers to include in the HTTP request”;

    public static ConfigDef config() {

        return new ConfigDef()

            .define(HTTP_URL, Type.STRING, Importance.HIGH, HTTP_URL_DOC)

            .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC)

            .define(POLL_INTERVAL_MS, Type.LONG, 5000, Importance.MEDIUM, POLL_INTERVAL_MS_DOC)

            .define(HTTP_HEADERS, Type.STRING, “”, Importance.LOW, HTTP_HEADERS_DOC);

    }

    public HttpSourceConfig(Map<String, String> parsedConfig) {

        super(config(), parsedConfig);

    }

}

Step 4: Implement the Connector Class

Create HttpSourceConnector.java:

package com.yourcompany.kafka.connect.http;

import org.apache.kafka.common.config.ConfigDef;

import org.apache.kafka.connect.connector.Task;

import org.apache.kafka.connect.source.SourceConnector;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

public class HttpSourceConnector extends SourceConnector {

    private Map<String, String> configProps;

    @Override

    public void start(Map<String, String> props) {

        configProps = props;

    }

    @Override

    public Class<? extends Task> taskClass() {

        return HttpSourceTask.class;

    }

    @Override

    public List<Map<String, String>> taskConfigs(int maxTasks) {

        List<Map<String, String>> taskConfigs = new ArrayList<>();

        for (int i = 0; i < maxTasks; i++) {

            taskConfigs.add(configProps);

        }

        return taskConfigs;

    }

    @Override

    public void stop() {

        // Clean up resources if needed

    }

    @Override

    public ConfigDef config() {

        return HttpSourceConfig.config();

    }

    @Override

    public String version() {

        return “1.0”;

    }

}

Step 5: Implement the Task Class

Create HttpSourceTask.java:

package com.yourcompany.kafka.connect.http;

import org.apache.kafka.connect.source.SourceRecord;

import org.apache.kafka.connect.source.SourceTask;

import org.apache.http.client.methods.HttpGet;

import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.impl.client.HttpClients;

import org.apache.http.util.EntityUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.IOException;

import java.util.Collections;

import java.util.List;

import java.util.Map;

import java.util.concurrent.TimeUnit;

public class HttpSourceTask extends SourceTask {

    private static final Logger log = LoggerFactory.getLogger(HttpSourceTask.class);

    private HttpSourceConfig config;

    private CloseableHttpClient httpClient;

    private long lastPollTime;

    private String url;

    private String topic;

    private long pollInterval;

    @Override

    public String version() {

        return “1.0”;

    }

    @Override

    public void start(Map<String, String> props) {

        config = new HttpSourceConfig(props);

        httpClient = HttpClients.createDefault();

        url = config.getString(HttpSourceConfig.HTTP_URL);

        topic = config.getString(HttpSourceConfig.TOPIC_CONFIG);

        pollInterval = config.getLong(HttpSourceConfig.POLL_INTERVAL_MS);

        lastPollTime = 0;

    }

    @Override

    public List<SourceRecord> poll() throws InterruptedException {

        long currentTime = System.currentTimeMillis();

        if (currentTime – lastPollTime < pollInterval) {

            TimeUnit.MILLISECONDS.sleep(pollInterval – (currentTime – lastPollTime));

        }

        try {

            HttpGet request = new HttpGet(url);

            String response = httpClient.execute(request, httpResponse -> {

                return EntityUtils.toString(httpResponse.getEntity());

            });

            lastPollTime = System.currentTimeMillis();

            SourceRecord record = new SourceRecord(

                Collections.singletonMap(“url”, url),  // source partition

                Collections.singletonMap(“timestamp”, currentTime),  // source offset

                topic,  // target topic

                null,  // partition (null for default)

                null,  // key schema

                null,  // key

                null,  // value schema

                response  // value

            );

            return Collections.singletonList(record);

        } catch (IOException e) {

            log.error(“Error polling HTTP endpoint”, e);

            return null;

        }

    }

    @Override

    public void stop() {

        try {

            if (httpClient != null) {

                httpClient.close();

            }

        } catch (IOException e) {

            log.error(“Error closing HTTP client”, e);

        }

    }

}

Step 6: Build and Package the Connector

Run the following Maven command to build the connector:

mvn clean package

This will create a JAR file in the target directory.

Step 7: Deploy the Connector

To deploy your custom connector:

  1. Copy the JAR file to your Kafka Connect plugin path (usually /usr/share/java/kafka-connect or similar)
  2. Restart your Kafka Connect worker
  3. Create a connector configuration file (e.g., http-connector.properties):

name=http-source-connector

connector.class=com.yourcompany.kafka.connect.http.HttpSourceConnector

tasks.max=1

http.url=https://api.example.com/data

topic=http_data

poll.interval.ms=60000

  1. Start the connector using the Kafka Connect REST API:

Advanced Considerations

  1. Error Handling: Implement robust error handling for HTTP timeouts, rate limiting, etc.
  2. Authentication: Add support for OAuth, API keys, or other auth mechanisms.
  3. Pagination: Handle paginated API responses.
  4. Schema Support: Add support for Avro or JSON schemas.
  5. Metrics: Add metrics collection to monitor connector performance.
  6. Backpressure: Implement backpressure handling for high-volume APIs.
  7. Incremental Loading: Track offsets to only fetch new data.

Conclusion

Building a custom HTTP source connector for Kafka gives you complete control over how data flows from REST APIs into your Kafka topics. While this example provides a basic implementation, you can extend it to handle more complex scenarios specific to your use case.

Remember to thoroughly test your connector under various failure scenarios and monitor its performance in production. The Kafka Connect framework provides a solid foundation, allowing you to focus on the business logic of your data integration needs.

Ready to Streamline Your Data Pipelines?

If you’re looking to implement custom Kafka connectors or build robust data streaming solutions, Alephys can help you architect the perfect system tailored to your business needs and ease you through your process

Whether you’re integrating complex APIs, optimizing data flow performance, or designing an enterprise-scale streaming architecture, our team of data experts will handle the technical heavy lifting. We help you unlock the full potential of real-time data while you focus on driving business value.
Author: Siva Munaga, Solution Architect at Alephys. I specialize in building scalable data infrastructure and streaming solutions that power modern applications. Let’s connect on LinkedIn to discuss your Kafka and data integration challenges!

Leave a Comment

Your email address will not be published. Required fields are marked *