0

I am currently using elasticsearch 2.3.4 and logstash 2.3.4 to load relational data from Oracle db into my elasticsearch index using logstash-jdbc plugin. As suggested in various posts, I am using aggregate filter for this. Still I am not able to load the inner nested object in the document. The values are not getting mapped to fields and are displayed as NULL.

I have two related entities with following data:

    CREATE TABLE DEPARTMENT (
        id NUMBER PRIMARY KEY,
        name VARCHAR2(4000) NOT NULL
    )

    CREATE TABLE EMPLOYEE (
        id NUMBER PRIMARY KEY,
        name VARCHAR2(4000) NOT NULL,
        departmentid NUMBER,
        CONSTRAINT EMPLOYEE_FK FOREIGN KEY (departmentid) REFERENCES DEPARTMENT(id)
    ) 


    insert into DEPARTMENT values (1, 'dept1');
    insert into DEPARTMENT values (2, 'dept2');
    insert into DEPARTMENT values (3, 'dept3');
    insert into DEPARTMENT values (4, 'dept4');

    insert into EMPLOYEE values (1, 'emp1', 1);
    insert into EMPLOYEE values (2, 'emp2', 1);
    insert into EMPLOYEE values (3, 'emp3', 1);
    insert into EMPLOYEE values (4, 'emp4', 2);
    insert into EMPLOYEE values (5, 'emp5', 2);
    insert into EMPLOYEE values (6, 'emp6', 3);`

Here is my mapping:

   {
        "mappings": {
            "departments": {
                "properties": {
                    "id": {
                        "type": "integer"
                    },
                    "deptName": {
                        "type": "string"
                    },          
                    "employee_details": {
                        "type": "nested",
                        "properties": {
                            "empId": {
                                "type": "integer"
                            },
                            "empName": {
                                "type": "string"
                            }
                        }
                    }
                }
            }
        }
    }

And this is my logstash configuration:

  input{
        jdbc{
            jdbc_validate_connection => true
            jdbc_connection_string => "jdbc:oracle:thin:@host:port:db"
            jdbc_user => "user"
            jdbc_password => "pwd"
            jdbc_driver_library => "../vendor/jar/ojdbc14.jar"
            jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
            statement => "SELECT 
                            department.id AS id,
                            department.name AS deptName,
                            employee.id AS empId,
                            employee.name AS empName
                        FROM  department LEFT JOIN employee  
                        ON department.id = employee.departmentid
                        ORDER BY id"
        }
    }

    filter{
        aggregate {
            task_id => "%{id}"
            code => "
            map['id'] = event['id']
            map['deptName'] = event['deptName'] #solution - deptName should be in smaller case and other fields too.
            map['employee_details'] ||= []
            map['employee_details'] << {'empId' => event['empId], 'empName' => event['empName'] }
            "

            push_previous_map_as_event => true
            timeout => 5
            timeout_tags => ['aggregated']
        }       
    }

    output{
    stdout{ codec => rubydebug }
        elasticsearch{
            action => "index"
            index => "my_index"
            document_type => "departments"
            document_id => "%{id}"
            hosts => "localhost:9200"
        }
    }

When i perform a XGET on all documents: curl -XGET 'localhost:9200/my_index/_search/?pretty=true&q=:

The values are not mapped to fields and displayed as NULL:

      "took": 1,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
      },
      "hits": {
        "total": 4,
        "max_score": 1,
        "hits": [
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "2",
            "_score": 1,
            "_source": {
              "id": 2,
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                },
                {
                  "empId": null,
                  "empName": null
                }
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.477Z",
              "tags": [
                "aggregated"
              ]
            }
          },
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "4",
            "_score": 1,
            "_source": {
              "id": 4,
              "deptname": "dept4",
              "empid": null,
              "empname": null,
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.367Z",
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                }
              ]
            }
          },
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "1",
            "_score": 1,
            "_source": {
              "id": 1,
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                },
                {
                  "empId": null,
                  "empName": null
                },
                {
                  "empId": null,
                  "empName": null
                }
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.477Z",
              "tags": [
                "aggregated"
              ]
            }
          },
          {
            "_index": "my_index",
            "_type": "departments",
            "_id": "3",
            "_score": 1,
            "_source": {
              "id": 3,
              "deptName": null,
              "employee_details": [
                {
                  "empId": null,
                  "empName": null
                }
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.492Z",
              "tags": [
                "aggregated"
              ]
            }
          }
        ]
      }
    }

rubydebug suggests the values are set to 'nil'. Could anyone please help me with what I am doing wrong here?

Here is a snippet from stdout for document with id = 1:

{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 1.0,
       "empname" => "emp1",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:14.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 2.0,
       "empname" => "emp2",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 3.0,
       "empname" => "emp3",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
                  "id" => 1.0,
            "deptName" => nil,
    "employee_details" => [
        [0] {
              "empId" => nil,
            "empName" => nil
        },
        [1] {
              "empId" => nil,
            "empName" => nil
        },
        [2] {
              "empId" => nil,
            "empName" => nil
        }
    ],
            "@version" => "1",
          "@timestamp" => "2019-05-14T12:32:15.381Z",
                "tags" => [
        [0] "aggregated"
    ]
}
1
  • 1
    If anyone stumbles at this post looking for a solution then this configuration totally works as it should except every field should be in small cases. Like map['deptname'] = event['deptname']. I got help from this post: discuss.elastic.co/t/… Commented May 14, 2019 at 17:42

2 Answers 2

0

Following code works for me .

input {
    jdbc{
        jdbc_validate_connection => true
        jdbc_connection_string => "----/employees"
        jdbc_user => "---"
        jdbc_password => "--"
        jdbc_driver_library => "/home/ilsa/mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        statement => "SELECT  
            e.emp_no as employee_number, 
            birth_date, first_name, last_name, gender, hire_date, t.title  AS titlename, 
            t.from_date AS titlefrom_date, t.to_date AS titleto_date, d.dept_no AS departmentnumber, 
            ds.dept_name AS departmentname, d.from_date AS departmentfrom_date, d.to_date AS departmentto_date 
        FROM employees e 
        LEFT JOIN(titles t, dept_emp d, departments ds) 
        ON(e.emp_no = t.emp_no AND e.emp_no = d.emp_no AND d.dept_no = ds.dept_no AND t.from_date < d.to_date AND t.to_date >   d.from_date) 
ORDER BY e.emp_no ASC"
    }

}
filter {
    aggregate {
        task_id => "%{employee_number}"
        code => "
            map['employee_number'] = event.get('employee_number')
            map['birth_date'] = event.get('birth_date')
            map['first_name'] = event.get('first_name')
            map['last_name'] = event.get('last_name')
            map['gender'] = event.get('gender')
            map['hire_date'] = event.get('hire_date')
            map['roles'] ||= []
            map['roles'] << {

                'title.name' => event.get('titlename'),
                'title.from_date' => event.get('titlefrom_date'),
                'title.to_date' => event.get('titleto_date'),
                'department.number' => event.get('departmentnumber'),
                'department.name' => event.get('departmentname'),
                'department.from_date' => event.get('departmentfrom_date'),
                'department.to_date' => event.get('departmentto_date')
            }
        event.cancel()"
        push_previous_map_as_event => true
        timeout => 30
    }
}
output {
    stdout{ codec => rubydebug }
    elasticsearch{
        action => "index"
        index => "employees"
        document_type => "employee"
        document_id => "%{employee_number}"
        hosts => "localhost:9200"
    }

}
Sign up to request clarification or add additional context in comments.

Comments

0

You can also try to make use of jdbc streaming in logstash filter plugin. Check this post Inserting Nested Objects using Logstash

For example, I am taking Stackoverflow Posts and Users as an example. Here Post is parent table and it is associated with Users table on OwnerUserId. So my plugin configuration is

    input {
    jdbc {
           jdbc_driver_library => "/usr/share/logstash/javalib/mssql-jdbc-8.2.2.jre11.jar"
           jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
           jdbc_connection_string => "jdbc:sqlserver://host.docker.internal;database=StackOverflow2010;user=pavan;password=pavankumar@123"
           jdbc_user => "pavan"
           jdbc_password => "pavankumar@123"
           statement => "select top 500 * from StackOverflow2010.dbo.Posts p "
        }
}

filter{
    jdbc_streaming {
    jdbc_driver_library => "/usr/share/logstash/javalib/mssql-jdbc-8.2.2.jre11.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://host.docker.internal;database=StackOverflow2010;user=pavan;password=pavankumar@123"
    jdbc_user => "pavan"
    jdbc_password => "pavankumar@123"
    statement => "select * from StackOverflow2010.dbo.Users u where u.Id = :owneruserid"
    parameters => {"owneruserid" => "owneruserid"}
    target => "user_details"
    }
}

output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200", "http://elasticsearch:9200"]
        index => "stackoverflow_top_user"
    }
    stdout {
        codec => rubydebug
    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.