Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 7.0.1
- Invoke post_connection_check on connect [#61](https://github.com/logstash-plugins/logstash-output-tcp/pull/61)

## 7.0.0
- SSL settings that were marked deprecated in version `6.2.0` are now marked obsolete, and will prevent the plugin from starting.
[#58](https://github.com/logstash-plugins/logstash-output-tcp/pull/58)
Expand Down
48 changes: 25 additions & 23 deletions lib/logstash/outputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def run_as_client
client_socket = nil
@codec.on_event do |event, payload|
begin
client_socket = connect unless client_socket
client_socket = retryable_connect unless client_socket
while payload && payload.bytesize > 0
begin
written_bytes_size = client_socket.write_nonblock(payload)
Expand Down Expand Up @@ -316,32 +316,34 @@ def log_error(msg, e, backtrace: @logger.info?, **details)
@logger.error(msg, details)
end

private

def connect
begin
client_socket = TCPSocket.new(@host, @port)
if @ssl_enabled
client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
log_error 'connect ssl failure:', ssle, backtrace: false
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
raise
end
client_socket = TCPSocket.new(@host, @port)
if @ssl_enabled
client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
begin
client_socket.connect
client_socket.post_connection_check(@host) if @ssl_verification_mode == 'full'
rescue OpenSSL::SSL::SSLError => ssle
log_error 'connect ssl failure:', ssle, backtrace: false
client_socket.close rescue nil
raise
end
client_socket.extend(::LogStash::Util::SocketPeer)
@logger.debug("opened connection", :client => client_socket.peer)
return client_socket
rescue => e
log_error 'failed to connect:', e
sleep @reconnect_interval
retry
end
end # def connect
client_socket.extend(::LogStash::Util::SocketPeer)
@logger.debug("opened connection", :client => client_socket.peer)
client_socket
end

private
def retryable_connect
connect
rescue => e
log_error 'failed to connect:', e
sleep @reconnect_interval
retry
end

private
def validate_ssl_config!
unless @ssl_enabled
ignored_ssl_settings = original_params.select { |k| k != 'ssl_enabled' && k != 'ssl_enable' && k.start_with?('ssl_') }
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-tcp.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-tcp'
s.version = '7.0.0'
s.version = '7.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes events over a TCP socket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
49 changes: 43 additions & 6 deletions spec/outputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
context "client mode" do
before { subject.register }

let(:config) { super().merge 'mode' => 'client' }
let(:config) { super().merge('mode' => 'client', 'host' => 'localhost') }

it 'writes payload data' do
Thread.start { sleep 0.25; subject.receive event }
Expand Down Expand Up @@ -248,28 +248,65 @@
expect( read ).to end_with 'foo bar'
end

context 'with ssl_verification_mode => full' do
let(:config) do
{
"mode" => "client",
"host" => "localhost",
"port" => port,
"ssl_enabled" => true,
"ssl_certificate_authorities" => crt_file,
"ssl_verification_mode" => "full",
"codec" => "plain"
}
end

context "with right host name" do
let(:config) { super().merge("host" => "localhost") }
it 'reads plain data' do
thread = Thread.start { sleep 0.25; subject.receive event }
socket = secure_server.accept
read = socket.sysread(100)
expect( read.size ).to be > 0
expect( read ).to end_with 'foo bar'
end
end

context "with wrong host name" do
let(:config) { super().merge("host" => "127.0.0.1") }
it 'closes the connection' do
thread = Thread.start do
sleep 0.25
expect { subject.connect }.to raise_error(OpenSSL::SSL::SSLError, /hostname "127.0.0.1" does not match the server certificate/)
end
secure_server.accept rescue nil # the other side will close the connection potentially causing a "Socket closed" error
thread.join
end
end
end

end

context 'with unsupported protocol (on server)' do

let(:config) { super().merge("ssl_supported_protocols" => ['TLSv1.1']) }
let(:config) { super().merge("ssl_supported_protocols" => ['TLSv1.1'], "reconnect_interval" => 1) }

let(:server_min_version) { 'TLS1_2' }

before { subject.register }
after { secure_server.close }

it 'fails (and loops retrying)' do
expect(subject.logger).to receive(:error).with(/connect ssl failure/i, hash_including(message: /No appropriate protocol/i)).and_call_original
expect(subject.logger).to receive(:error).with(/failed to connect/i, hash_including(exception: OpenSSL::SSL::SSLError)).and_call_original
expect(subject.logger).to receive(:error).twice.with(/connect ssl failure/i, hash_including(message: /No appropriate protocol/i)).and_call_original
expect(subject.logger).to receive(:error).twice.with(/failed to connect/i, hash_including(exception: OpenSSL::SSL::SSLError)).and_call_original
expect(subject).to receive(:sleep).once.and_call_original
expect(subject).to receive(:sleep).once.and_throw :TEST_DONE # to be able to abort the retry loop

Thread.start { secure_server.accept rescue nil }
expect { subject.receive event }.to throw_symbol(:TEST_DONE)
expect { sleep 0.25; subject.receive event }.to throw_symbol(:TEST_DONE)
end

end if LOGSTASH_VERSION > '7.0'
end

end

Expand Down