Indexer - ElasticSearch 8

Cloud Search Service Introduction

Cloud Search Service is a fully managed, one-stop information retrieval and analysis platform that provides ElasticSearch and OpenSearch engines, supporting full-text search, vector search, hybrid search, and spatio-temporal search capabilities.

This is an Elasticsearch 8.x indexer implementation for Eino that implements the Indexer interface. It integrates with Eino’s vector storage and retrieval system for semantic search.

Features

  • Implements github.com/cloudwego/eino/components/indexer.Indexer
  • Easy integration with Eino indexing system
  • Configurable Elasticsearch parameters
  • Supports vector similarity search
  • Batch indexing operations
  • Custom field mapping support
  • Flexible document embedding

Installation

go get github.com/cloudwego/eino-ext/components/indexer/es8@latest

Quick Start

Here is a quick example of using the indexer. For more details, please read components/indexer/es8/examples/indexer/add_documents.go:

import (
        "context"
        "os"

        "github.com/cloudwego/eino/components/embedding"
        "github.com/cloudwego/eino/schema"
        elasticsearch "github.com/elastic/go-elasticsearch/v8"

        "github.com/cloudwego/eino-ext/components/embedding/ark"
        "github.com/cloudwego/eino-ext/components/indexer/es8"
)

const (
        indexName          = "eino_example"
        fieldContent       = "content"
        fieldContentVector = "content_vector"
        fieldExtraLocation = "location"
        docExtraLocation   = "location"
)

func main() {
        ctx := context.Background()
        // es supports multiple ways to connect
        username := os.Getenv("ES_USERNAME")
        password := os.Getenv("ES_PASSWORD")

        // 1. Create ES client
        httpCACertPath := os.Getenv("ES_HTTP_CA_CERT_PATH")
        if httpCACertPath != "" {
                cert, err := os.ReadFile(httpCACertPath)
                if err != nil {
                        log.Fatalf("read file failed, err=%v", err)
                }
        }

        client, _ := elasticsearch.NewClient(elasticsearch.Config{
                Addresses: []string{"https://localhost:9200"},
                Username:  username,
                Password:  password,
                CACert:    cert,
        })

        // 2. Create embedding component
        // Using Volcengine Ark, replace environment variables with real configuration
        emb, _ := ark.NewEmbedder(ctx, &ark.EmbeddingConfig{
                APIKey: os.Getenv("ARK_API_KEY"),
                Region: os.Getenv("ARK_REGION"),
                Model:  os.Getenv("ARK_MODEL"),
        })

        // 3. Prepare documents
        // Documents typically contain ID and Content. You can also add extra metadata for filtering and other purposes.
        docs := []*schema.Document{
                {
                        ID:      "1",
                        Content: "Eiffel Tower: Located in Paris, France.",
                        MetaData: map[string]any{
                                docExtraLocation: "France",
                        },
                },
                {
                        ID:      "2",
                        Content: "The Great Wall: Located in China.",
                        MetaData: map[string]any{
                                docExtraLocation: "China",
                        },
                },
        }

        // 4. Create ES indexer component
        indexer, _ := es8.NewIndexer(ctx, &es8.IndexerConfig{
                Client:    client,
                Index:     indexName,
                BatchSize: 10,
                // DocumentToFields specifies how to map document fields to ES fields
                DocumentToFields: func(ctx context.Context, doc *schema.Document) (field2Value map[string]es8.FieldValue, err error) {
                        return map[string]es8.FieldValue{
                                fieldContent: {
                                        Value:    doc.Content,
                                        EmbedKey: fieldContentVector, // Embed document content and save to "content_vector" field
                                },
                                fieldExtraLocation: {
                                        // Extra metadata field
                                        Value: doc.MetaData[docExtraLocation],
                                },
                        }, nil
                },
                // Provide embedding component for vectorization
                Embedding: emb,
        })

        // 5. Index documents
        ids, err := indexer.Store(ctx, docs)
        if err != nil {
                fmt.Printf("index error: %v\n", err)
                return
        }
        fmt.Println("indexed ids:", ids)
}

Configuration

Configure the indexer using the IndexerConfig struct:

type IndexerConfig struct {
    Client *elasticsearch.Client // Required: Elasticsearch client instance
    Index  string                // Required: Index name to store documents
    BatchSize int                // Optional: Maximum number of texts for embedding (default: 5)

    // Required: Function to map Document fields to Elasticsearch fields
    DocumentToFields func(ctx context.Context, doc *schema.Document) (map[string]FieldValue, error)

    // Optional: Only required when vectorization is needed
    Embedding embedding.Embedder
}

// FieldValue defines how a field should be stored and vectorized
type FieldValue struct {
    Value     any    // Raw value to store
    EmbedKey  string // If set, Value will be vectorized and saved
    Stringify func(val any) (string, error) // Optional: Custom string conversion
}

Getting Help

If you have any questions or feature suggestions, feel free to join the oncall group.


Last modified January 20, 2026: feat(eino): sync En docs with zh docs (9da8ff724c)